kafka应用场景有哪些?应用实例介绍

KLQ 2020-07-01 14:57:02 java常见问答 6022

之前给大家介绍了一下Kafka是什么,下面要接着给大家介绍的是Kafka应用场景以及应用实例的内容,一起来了解一下吧!

1、kafka应用场景

总的来说,kafka适用以下的场景:

(1)用户活动跟踪

kafka经常被用来记录web用户又或者是app用户的各种活动。

例:

搜索、点击、浏览网页等等,这些活动信息被各个服务器发布到kafka的topic当中,之后,消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;

(2)流式处理

比较典型的就是spark streaming以及storm;

(3)消息系统

解耦生产者和消费者、缓存消息等等;

(4)日志收集

一个企业能够用kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;

(5)运营指标

kafka经常被用来记录运营监控数据;

2、kafka应用实例

kafka java应用demo

kafka Producer

package kafkaTest;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class KafkaProducer
{
    private final Producer < String, String > producer;
    public final static String TOPIC = "TEST-TOPIC";
    public KafkaProducer()
    {
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.1.103:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "-1");
        producer = new Producer < String, String > (new ProducerConfig(props));
    }
    public void produce()
    {
        int messageNo = 1000;
        final int COUNT = 10000;
        while (messageNo < COUNT)
        {
            String key = String.valueOf(messageNo);
            String data = "@@@@@hello kafka message" + key;
            producer.send(new KeyedMessage < String, String > (TOPIC, key, data));
            System.out.println(data);
            messageNo++;
        }
    }
    public static void main(String[] args)
    {
        new KafkaProducer()
            .produce();
    }
}

kafka consumer

package kafkaTest;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer
{
    private final ConsumerConnector consumer;
    public KafkaConsumer()
    {
        Properties props = new Properties();
        props.put("zookeeper.connect", "127.0.0.1:2181");
        props.put("group.id", "test-group");
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    }
    public void consume()
    {
        Map < String, Integer > topicCountMap = new HashMap < String, Integer > ();
        topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map < String, List < KafkaStream < String, String >>> consumerMap =
            consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        KafkaStream < String, String > stream = consumerMap.get(KafkaProducer.TOPIC)
            .get(0);
        ConsumerIterator < String, String > it = stream.iterator();
        while (it.hasNext())
        {
            System.out.println(it.next()
                .message());
        }
    }
    public static void main(String[] args)
    {
        new KafkaConsumer()
            .consume();
    }
}

关于kafka的相关内容就给你介绍到这里了,你还想了解更多的相关知识吗?请继续关注本站的常见问题栏目来进行了解吧。

推荐阅读:

Kafka怎样保证消息不丢失?不重复?怎样保证消息顺序?

Kafka和mq的区别是什么?和rabbitmq有什么区别?

springboot整合kafka如何实现?示例