作为有一定java开发经验的朋友来说,都了解我们在实现业务时中需要使用到异步消息队列,但是这也是需要搭建一个消息中间件的,这里我们选择用kafka,其实在springboot中整合kafka还是比较容易的,那么下面我们一起来看看,springboot整合kafka究竟应该如何实现呢?
kafka是由Apache软件基金会开发的一个开源流处理的平台,是由Scala和Java编写的。Kafka它是一种高吞吐量的分布式发布订阅消息系统,它可以很好的处理消费者规模的网站中的所有动作流数据。 这种网页浏览,搜索和其他用户的行动,是在现代网络上的许多社会功能的一个比较关键的因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决的。那么对于像Hadoop的一样的日志数据和离线分析系统,但是又要求实时处理的限制,所以这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
springboot整合kafka的示例如下:
pom.xml引入:
<!--kafka支持--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> <!--$NO-MVN-MAN-VER$--> </dependency>
application.properties配置:
#kafka相关配置 spring.kafka.bootstrap-servers=192.168.1.180:9092 #设置一个默认组 spring.kafka.consumer.group-id=0 #key-value序列化反序列化 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #每次批量发送消息的数量 spring.kafka.producer.batch-size=65536 spring.kafka.producer.buffer-memory=524288
生产者KafkaSender:
/** * 生产者 */ @Component public class KafkaSender { @Autowired private KafkaTemplate < String, String > kafkaTemplate; /** * 发送消息到kafka */ public void sendChannelMess(String channel, String message) { kafkaTemplate.send(channel, message); } } 消费者: /** * 消费者 spring-kafka 2.0 + 依赖JDK8 */ @Component public class KafkaConsumer { /** * 监听seckill主题,有消息就读取 * @param message */ @KafkaListener(topics = { "seckill" }) public void receiveMessage(String message) { //收到通道的消息之后执行秒杀操作 } }
好了,以上就是本篇文章的所有内容了,还想了解更多java架构师相关信息,记得来关注本站消息哦。