小伙伴们呢都知道kafka吧,这是一个拥有者超高吞吐量的开源流处理平台,小伙伴们知道它的运行原理是什么吗?这次就一起来看看吧。
一、Kafka基本概念及原理
Kafka全名Apache Kafka,看见这个名字你就知道是谁开发了的,没错,kafka是由Apache开发的一个开源的分布式消息系统,现在是Apache名下的一个子项目,且已经成为开源领域目前最热门的消息系统之一。
Kafka与传统消息系统的不同在于:
1)、kafka是一个分布式系统,易于向外扩展。
2)、它能够同时为发布和订阅提供高吞吐量。
3)、它是支持多订阅者的,当失败时能自动平衡消费者。
4)、它能构成消息的持久化。
kafka名词分析
一般在一套kafka架构中是会有多个Producer,多个Broker,多个Consumer的,每个Producer可以对应多个Topic,每个Consumer则只能对应一个ConsumerGroup。
整个Kafka架构对应着一个ZK集群,它通过ZK管理集群配置,选举Leader,并在consumer group发生变化时进行rebalance。
Kafka架构图
二、Kafka简单实现
生产者
producer
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class UserKafkaProducer extends Thread { private final KafkaProducer < Integer, String > producer; private final String topic; private final Properties props = new Properties(); public UserKafkaProducer(String topic) { props.put("metadata.broker.list", "localhost:9092"); props.put("bootstrap.servers", "master2:6667"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer < Integer, String > (props); this.topic = topic; } @Override public void run() { int messageNo = 1; while (true) { String messageStr = new String("Message_" + messageNo); System.out.println("Send:" + messageStr); //返回的是Future<RecordMetadata>,异步发送 producer.send(new ProducerRecord < Integer, String > (topic, messageStr)); messageNo++; try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者
Properties props = new Properties(); /* 定义kakfa 服务的地址,不需要将所有broker指定上 */ props.put("bootstrap.servers", "localhost:9092"); /* 制定consumer group */ props.put("group.id", "test"); /* 是否自动确认offset */ props.put("enable.auto.commit", "true"); /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); /* key的序列化类 */ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* value的序列化类 */ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* 定义consumer */ KafkaConsumer < String, String > consumer = new KafkaConsumer < > (props); /* 消费者订阅的topic, 可同时订阅多个 */ consumer.subscribe(Arrays.asList("foo", "bar")); /* 读取数据,读取超时时间为100ms */ while (true) { ConsumerRecords < String, String > records = consumer.poll(100); for (ConsumerRecord < String, String > record: records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
以上就是关于kafka的一些原理介绍了,小伙伴们了解了吧,如果还有疑问,想了解更多java常见问题及解决方法的话,请务必记得关注奇Q工具网了解详情。
推荐阅读: