1. 概述
本文将深入探讨 KafkaStreams 这个强大的流处理库。作为Apache Kafka官方团队打造的工具,它的核心使命是让开发者能够高效构建实时流处理应用,特别适合微服务架构场景。
KafkaStreams的核心能力包括:
- ✅ 从Kafka主题消费数据
- ✅ 实时分析或转换数据
- ✅ 将结果输出到其他Kafka主题
我们将通过一个单词计数的实战案例来演示:从指定主题读取句子,实时统计单词出现频率并输出结果。
⚠️ 需要注意的是:KafkaStreams不是响应式框架,不支持异步操作和背压处理。
2. Maven依赖配置
要开始使用KafkaStreams,首先添加核心依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
前置条件:
- 已安装并启动Apache Kafka服务
- 准备好测试用的Kafka主题
可从Confluent官网下载完整环境。
3. 配置KafkaStreams输入
3.1 定义输入主题
首先创建输入主题(这里使用Confluent工具包):
./confluent start
3.2 基础配置
设置应用ID和Kafka服务器地址:
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-live-test");
// 关键配置:Kafka集群地址
private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
3.3 序列化配置
指定消息键值类型的序列化方式:
streamsConfiguration.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
3.4 状态存储配置
流处理通常是有状态的,需要配置状态存储目录:
this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(
StreamsConfig.STATE_DIR_CONFIG,
this.stateDirectory.toAbsolutePath().toString());
4. 构建流处理拓扑
核心步骤:
- 创建流构建器
- 定义数据流处理逻辑
- 实现单词计数算法
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
处理逻辑解析:
- 分词:使用正则表达式拆分句子
- 扁平化:
flatMapValues()
将数组转换为独立单词流 - 分组:按单词分组
- 计数:
count()
聚合计算单词出现次数
5. 处理结果输出
5.1 控制台输出(开发调试)
wordCounts.toStream()
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
5.2 生产环境输出到Kafka主题
String outputTopic = "outputTopic";
wordCounts.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
关键点说明:
Serdes
提供预置的序列化器- 键类型:String(单词)
- 值类型:Long(计数)
- 结果将持久化到
outputTopic
主题
6. 启动流处理任务
6.1 任务启动代码
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();
// 测试环境等待30秒
Thread.sleep(30000);
streams.close();
⚠️ 生产环境应保持持续运行,此处仅作演示。
6.2 测试验证
启动控制台生产者发送测试数据:
./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"
预期输出结果:
word: -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word: -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2
结果分析:
- 首条消息中"pony"计数为1
- 第二条消息后"pony"计数变为2
- 空字符串(分隔符产生)也被统计
7. 总结
本文展示了如何使用KafkaStreams构建基础流处理应用,核心要点:
配置要点:
- 应用ID唯一标识
- 序列化器匹配数据类型
- 状态目录配置
处理流程:
- 消费 → 转换 → 聚合 → 输出
- 简单粗暴的单词计数实现
生产建议:
- 结果输出到Kafka主题而非控制台
- 持续运行而非定时关闭
完整代码示例可在GitHub项目获取,这是一个开箱即用的Maven项目。