之前给大家介绍了一下Kafka是什么,下面要接着给大家介绍的是Kafka应用场景以及应用实例的内容,一起来了解一下吧!
1、kafka应用场景
总的来说,kafka适用以下的场景:
(1)用户活动跟踪
kafka经常被用来记录web用户又或者是app用户的各种活动。
例:
搜索、点击、浏览网页等等,这些活动信息被各个服务器发布到kafka的topic当中,之后,消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;
(2)流式处理
比较典型的就是spark streaming以及storm;
(3)消息系统
解耦生产者和消费者、缓存消息等等;
(4)日志收集
一个企业能够用kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;
(5)运营指标
kafka经常被用来记录运营监控数据;
2、kafka应用实例
kafka java应用demo
kafka Producer
package kafkaTest; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducer { private final Producer < String, String > producer; public final static String TOPIC = "TEST-TOPIC"; public KafkaProducer() { Properties props = new Properties(); props.put("metadata.broker.list", "192.168.1.103:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "-1"); producer = new Producer < String, String > (new ProducerConfig(props)); } public void produce() { int messageNo = 1000; final int COUNT = 10000; while (messageNo < COUNT) { String key = String.valueOf(messageNo); String data = "@@@@@hello kafka message" + key; producer.send(new KeyedMessage < String, String > (TOPIC, key, data)); System.out.println(data); messageNo++; } } public static void main(String[] args) { new KafkaProducer() .produce(); } }
kafka consumer
package kafkaTest; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class KafkaConsumer { private final ConsumerConnector consumer; public KafkaConsumer() { Properties props = new Properties(); props.put("zookeeper.connect", "127.0.0.1:2181"); props.put("group.id", "test-group"); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("serializer.class", "kafka.serializer.StringEncoder"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); } public void consume() { Map < String, Integer > topicCountMap = new HashMap < String, Integer > (); topicCountMap.put(KafkaProducer.TOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map < String, List < KafkaStream < String, String >>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream < String, String > stream = consumerMap.get(KafkaProducer.TOPIC) .get(0); ConsumerIterator < String, String > it = stream.iterator(); while (it.hasNext()) { System.out.println(it.next() .message()); } } public static void main(String[] args) { new KafkaConsumer() .consume(); } }
关于kafka的相关内容就给你介绍到这里了,你还想了解更多的相关知识吗?请继续关注本站的常见问题栏目来进行了解吧。
推荐阅读: