Kafka原理解析,如何简单实现一个Kafka?

TheDisguiser 2020-07-02 09:38:16 java常见问答 7349

小伙伴们呢都知道kafka吧,这是一个拥有者超高吞吐量的开源流处理平台,小伙伴们知道它的运行原理是什么吗?这次就一起来看看吧。

一、Kafka基本概念及原理

Kafka全名Apache Kafka,看见这个名字你就知道是谁开发了的,没错,kafka是由Apache开发的一个开源的分布式消息系统,现在是Apache名下的一个子项目,且已经成为开源领域目前最热门的消息系统之一。

Kafka与传统消息系统的不同在于:

1)、kafka是一个分布式系统,易于向外扩展。

2)、它能够同时为发布和订阅提供高吞吐量。

3)、它是支持多订阅者的,当失败时能自动平衡消费者。

4)、它能构成消息的持久化。

kafka名词分析

一般在一套kafka架构中是会有多个Producer,多个Broker,多个Consumer的,每个Producer可以对应多个Topic,每个Consumer则只能对应一个ConsumerGroup。

整个Kafka架构对应着一个ZK集群,它通过ZK管理集群配置,选举Leader,并在consumer group发生变化时进行rebalance。

Kafka原理

Kafka架构图

Kafka原理

二、Kafka简单实现

生产者

producer

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class UserKafkaProducer extends Thread
{
    private final KafkaProducer < Integer, String > producer;
    private final String topic;
    private final Properties props = new Properties();
    public UserKafkaProducer(String topic)
    {
        props.put("metadata.broker.list", "localhost:9092");
        props.put("bootstrap.servers", "master2:6667");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer < Integer, String > (props);
        this.topic = topic;
    }
    @Override
    public void run()
    {
        int messageNo = 1;
        while (true)
        {
            String messageStr = new String("Message_" + messageNo);
            System.out.println("Send:" + messageStr);
            //返回的是Future<RecordMetadata>,异步发送
            producer.send(new ProducerRecord < Integer, String > (topic, messageStr));
            messageNo++;
            try
            {
                sleep(3000);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
}

消费者

Properties props = new Properties();
/* 定义kakfa 服务的地址,不需要将所有broker指定上 */
props.put("bootstrap.servers", "localhost:9092");
/* 制定consumer group */
props.put("group.id", "test");
/* 是否自动确认offset */
props.put("enable.auto.commit", "true");
/* 自动确认offset的时间间隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key的序列化类 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化类 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* 定义consumer */
KafkaConsumer < String, String > consumer = new KafkaConsumer < > (props);
/* 消费者订阅的topic, 可同时订阅多个 */
consumer.subscribe(Arrays.asList("foo", "bar"));
/* 读取数据,读取超时时间为100ms */
while (true)
{
    ConsumerRecords < String, String > records = consumer.poll(100);
    for (ConsumerRecord < String, String > record: records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

以上就是关于kafka的一些原理介绍了,小伙伴们了解了吧,如果还有疑问,想了解更多java常见问题及解决方法的话,请务必记得关注奇Q工具网了解详情。

推荐阅读:

Kafka是什么?特性有哪些?

kafka应用场景有哪些?应用实例介绍

Kafka和mq的区别是什么?和rabbitmq有什么区别?