评论

收藏

[Java] Springboot整合Kafka Stream实时统计数据

编程语言 编程语言 发布于:2021-09-11 21:17 | 阅读数:665 | 评论:0

Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。
DSC0000.png

环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2
Kafka Stream介绍
Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。
流式计算一般被用来和批量计算做比较。批量计算往往有一个固定的数据集作为输入并计算结果。而流式计算的输入往往是“无界”的(Unbounded Data),持续输入的,即永远拿不到全量数据去做计算;同时,计算结果也是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果。
Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。
Kafka Streams的门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。
Kafka Streams的一些特点:

  • 被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中
  • 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性
  • 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations)
  • 支持exactly-once语义
  • 支持纪录级的处理,实现毫秒级的延迟
  • 提供High-Level的Stream DSL和Low-Level的Processor API
Stream Processing Topology流处理拓扑

  • 流是Kafka Streams提供的最重要的抽象:它表示一个无限的、不断更新的数据集。流是不可变数据记录的有序、可重放和容错序列,其中数据记录定义为键值对。
  • Stream Processing Application是使用了Kafka Streams库的应用程序。它通过processor topologies定义计算逻辑,其中每个processor topology都是多个stream processor(节点)通过stream组成的图。
  • A stream processor 是处理器拓扑中的节点;它表示一个处理步骤,通过每次从拓扑中的上游处理器接收一个输入记录,将其操作应用于该记录,来转换流中的数据,并且随后可以向其下游处理器生成一个或多个输出记录。
有两种特殊的processor:
Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。
Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。
DSC0001.jpg 相关的核心概念查看如下链接
DSC0002.jpg 下面演示Kafka Stream 在Springboot中的应用
依赖

<dependency> 
  <groupId>org.springframework.boot</groupId> 
  <artifactId>spring-boot-starter-web</artifactId> 
  </dependency> 
<dependency> 
  <groupId>org.springframework.kafka</groupId> 
  <artifactId>spring-kafka</artifactId> 
</dependency> 
<dependency> 
  <groupId>org.apache.kafka</groupId> 
  <artifactId>kafka-streams</artifactId> 
</dependency>
配置
server: 
  port: 9090 
spring: 
  application: 
  name: kafka-demo 
  kafka: 
  streams: 
    application-id: ${spring.application.name} 
    properties: 
    spring.json.trusted.packages: '*' 
  bootstrap-servers: 
  - localhost:9092 
  - localhost:9093 
  - localhost:9094 
  producer: 
    acks: 1 
    retries: 10 
    key-serializer: org.apache.kafka.common.serialization.StringSerializer 
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer 
    properties: 
    spring.json.trusted.packages: '*' 
  consumer: 
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
    value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer 
    enable-auto-commit: false 
    group-id: ConsumerTest 
    auto-offset-reset: latest 
    properties: 
    session.timeout.ms: 12000 
    heartbeat.interval.ms: 3000 
    max.poll.records: 100 
    spring.json.trusted.packages: '*' 
  listener: 
    ack-mode: manual-immediate 
    type: batch 
    concurrency: 8 
  properties: 
    max.poll.interval.ms: 300000
消息发送
@Service 
public class MessageSend { 
  @Resource 
  private KafkaTemplate<String, Message> kafkaTemplate ; 
  public void sendMessage2(Message message) { 
  kafkaTemplate.send(new ProducerRecord<String, Message>("test", message)).addCallback(result -> { 
    System.out.println("执行成功..." + Thread.currentThread().getName()) ; 
  }, ex -> { 
    System.out.println("执行失败") ; 
    ex.printStackTrace() ; 
  }) ; 
  } 
}
消息监听
@KafkaListener(topics = {"test"}) 
public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  for (ConsumerRecord<String, Message> record : records) { 
  System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  } 
  try { 
  TimeUnit.SECONDS.sleep(0) ; 
  } catch (InterruptedException e) { 
  e.printStackTrace(); 
  } 
  ack.acknowledge() ; 
} 
   
@KafkaListener(topics = {"demo"}) 
public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) { 
  for (ConsumerRecord<String, Message> record : records) { 
  System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ; 
  } 
  ack.acknowledge() ; 
}
Kafka Stream处理
消息转换并转发其它Topic
@Bean 
public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) { 
  KStream<Object, Object> stream = streamsBuilder.stream("test"); 
  stream.map((key, value) -> { 
  System.out.println("原始消息内容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ; 
  return new KeyValue<>(key, "{"title": "123123", "message": "重新定义内容"}".getBytes(Charset.forName("UTF-8"))) ; 
  }).to("demo") ; 
  return stream; 
}
执行结果:
DSC0003.jpg Stream对象处理
@Bean 
public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) { 
  JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  descri.addTrustedPackages("*") ; 
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  stream.map((key, value) -> { 
  value.setTitle("XXXXXXX") ; 
  return new KeyValue<>(key, value) ; 
  }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ; 
  return stream; 
}
执行结果:
DSC0004.jpg 分组处理
@Bean 
public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) { 
  JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  descri.addTrustedPackages("*") ; 
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  @Override 
  public String apply(String key, Message value) { 
    return value.getOrgCode() ; 
  } 
  }) 
  .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  .count() 
  .toStream().print(Printed.toSysOut()); 
  return stream; 
}
执行结果:
DSC0005.jpg 聚合
@Bean 
public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) { 
  JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  descri.addTrustedPackages("*") ; 
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  @Override 
  public String apply(String key, Message value) { 
    return value.getOrgCode() ; 
  } 
  }) 
  .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  return aggValue + 1 ; 
  }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  .toStream().print(Printed.toSysOut()); 
  return stream; 
}
执行结果:
DSC0006.jpg Filter过滤数据
@Bean 
public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) { 
  JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  descri.addTrustedPackages("*") ; 
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  stream.selectKey(new KeyValueMapper<String, Message, String>() { 
  @Override 
  public String apply(String key, Message value) { 
    return value.getOrgCode() ; 
  } 
  }) 
  .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  .aggregate(() -> 0L, (key, value ,aggValue) -> { 
  System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; 
  return aggValue + 1 ; 
  }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) 
  .toStream() 
  .filter((key, value) -> !"2".equals(key)) 
  .print(Printed.toSysOut()); 
  return stream; 
}
执行结果:
DSC0007.jpg 过滤Key不等于"2"
分支多流处理
@Bean 
public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) { 
  JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  descri.addTrustedPackages("*") ; 
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  // 分支,多流处理 
  KStream<String, Message>[] arrStream = stream.branch( 
  (key, value) -> "男".equals(value.getSex()),  
  (key, value) -> "女".equals(value.getSex())); 
  Stream.of(arrStream).forEach(as -> { 
  as.foreach((key, message) -> { 
    System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ; 
  }); 
  }); 
  return stream; 
}
执行结果:
DSC0008.jpg 多字段分组
不能使用多个selectKey,后面的会覆盖前面的
@Bean 
public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) { 
  JsonSerde<Message> jsonSerde = new JsonSerde<>() ; 
  JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; 
  descri.addTrustedPackages("*") ; 
  KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); 
  stream 
  .selectKey(new KeyValueMapper<String, Message, String>() { 
  @Override 
  public String apply(String key, Message value) { 
    System.out.println(Thread.currentThread().getName()) ; 
    return value.getTime() + " | " + value.getOrgCode() ; 
  } 
  }) 
  .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) 
  .count() 
  .toStream().print(Printed.toSysOut()); 
  return stream; 
}
执行结果:
DSC0009.jpg 原文链接:https://www.toutiao.com/i6994698489594790431/

关注下面的标签,发现更多相似文章