Kafka 流式计算,kafka新用法实现

TheDisguiser 2020-07-02 10:59:20 java常见问答 8115

Kafka streams相信大家都听说过,它是目前流式处理系统里最流行的之一,今天就来了解一下它的一些新用法吧。

我们知道,kafka streams从0.10.0开始引入的,到现在已经更新到0.11.0。它有什么优势让我们必须使用它呢?首先,它的使用成本可以说是非常低廉的,仅仅只需在代码中依赖一下streams lib,之后编写计算逻辑,直接启动APP就可以了。其次,kafka streams的负载均衡也十分简单强悍,增加或者减少运行实例就可以动态调整,无需人工干预。最后还有一个0.11开始支持的特大杀器,它提供 Exactly-once消息传递特性,这中间包含了producer幂等性,不会重复发送消息到broker;consumer exactly once,不会重复消费也不会丢失。在运算失败的时候,重启运算实例即可恢复。

streams api用法

WordCount:
KStreamBuilder builder = new KStreamBuilder();
KStream < String, String > textLines = builder.stream(stringSerde, stringSerde, textLinesTopic);
KStream < String, Long > wordCounts = textLines
    .flatMapValues(value - > Arrays.asList(value.toLowerCase()
        .split("\\W+")))
    .map((key, word) - > new KeyValue < > (word, word))
    .groupByKey()
    .count("counts")
    .toStream();
wordCounts.to(stringSerde, longSerde, countTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

上面这段简短的代码中包括了consumer顶阅主题并消费、统计词频、producer写入到另一个Topic。跟踪代码可以发现flatMapValues,map函数本质上是处理单元processor,在函数调用时,API会创建特定的processor加入到拓扑中。

以实际来说,这种消费单个主题进行运算的方式是可以做一些日志的统计分析的,如网站的UV,PV等。但如果需要处理更复杂的业务,就不可避免需要关联操作了。同样的,kafka streams 提供了join函数。

  KStreamBuilder streamBuilder = new KStreamBuilder();
  String userStore = "user_store";
  String driverStore = "driver_Store";
  KTable < String, UserOrder > userOrderKTable = streamBuilder.table(Serdes.String()
      , SerdeFactory.serdeFrom(UserOrder.class), TOPIC_USER_ORER, userStore);
  KTable < String, DriverOrder > driverOrderKTable = streamBuilder.table(Serdes.String()
      , SerdeFactory.serdeFrom(DriverOrder.class), TOPIC_DRIVER_ORDER, driverStore);
  userOrderKTable.leftJoin(driverOrderKTable
          , (userOrder, driverOrder) - > join(userOrder, driverOrder))
      .toStream()
      .map((k, v) - > new KeyValue < > (k, v))
      .to(Serdes.String(), SerdeFactory.serdeFrom(Travel.class), TOPIC_TRAVEL);

join的语法本质上是join by partition and key。为了得到正确的Join结果,两个不同的topic需要在同一个运行实例中被消费到。假设,Topic1 和Topic2各有4个partition,有两个实例在运行,于是一个task仅消费Topic1和Topic2的两个,这样就需要保证,topic1中的两个partition的key值在另一个topic中能被找到。

上面的逻辑可能听起来会十分绕口,其实在实际开发的过程中,我们仅需保证两个topic拥有相同数量的partition就可以的,并且producer采用同样的Paritioner。如果这个条件不满足,就需要通过through函数完成。

KStream < K, V > through(Serde < K > keySerde, Serde < V > valSerde
    , StreamPartitioner < K, V > partitioner, String topic);

假设在join过程中,我们需要最新的数据做聚合。kafka streams 提供了windowed函数,在时间窗口内,后续的记录会覆盖同一个Key的记录。窗口结束后,会触发后续的计算逻辑得到正确的结果。

 KTable < Windowed < String > , MultiUserOrder > userOrderKTable = streamBuilder.stream(Serdes.String()
         , SerdeFactory.serdeFrom(UserOrder.class)
         , TOPIC_USER_ORER, userStore)
     .groupByKey()
     .aggregate(
         new MultiUserOrder(), (k, v, map) - >
         {
             map.setOrderId(k);
             map.getOrders()
             .add(v);
             return map;
         }
         , TimeWindows.of(6 * 1000)
         , SerdeFactory.serdeFrom(MultiUserOrder.class), userAggStore);
 KTable < Windowed < String > , MultiDriverOrder > driverOrderKTable = streamBuilder.stream(Serdes.String()
         , SerdeFactory.serdeFrom(DriverOrder.class)
         , TOPIC_DRIVER_ORDER, driverStore)
     .groupByKey()
     .aggregate(new MultiDriverOrder(), (k, v, map) - >
         {
             map.setOrderId(k);
             map.getOrders()
             .add(v);
             return map;
         }, TimeWindows.of(6 * 1000)
         , SerdeFactory.serdeFrom(MultiDriverOrder.class), driverAggStore);
 userOrderKTable.leftJoin(driverOrderKTable
         , (multiUserOrder, multiDriverOrder) - > join(multiUserOrder, multiDriverOrder))
     .toStream()
     .map((k, v) - > new KeyValue < > (k.key(), v))
     .to(Serdes.String(), SerdeFactory.serdeFrom(Travel.class), TOPIC_TRAVEL);

以上就是本篇文章所有内容了,更多java常见问题及解决方法请快关注奇Q工具网了解详情吧。

推荐阅读:

kafka应用场景有哪些?应用实例介绍

Kafka和mq的区别是什么?和rabbitmq有什么区别?

springboot整合kafka如何实现?示例