1. 概述

本文将深入探讨 KafkaStreams 这个强大的流处理库。作为Apache Kafka官方团队打造的工具,它的核心使命是让开发者能够高效构建实时流处理应用,特别适合微服务架构场景。

KafkaStreams的核心能力包括:

  • ✅ 从Kafka主题消费数据
  • ✅ 实时分析或转换数据
  • ✅ 将结果输出到其他Kafka主题

我们将通过一个单词计数的实战案例来演示:从指定主题读取句子,实时统计单词出现频率并输出结果。

⚠️ 需要注意的是:KafkaStreams不是响应式框架,不支持异步操作和背压处理。

2. Maven依赖配置

要开始使用KafkaStreams,首先添加核心依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

前置条件:

  • 已安装并启动Apache Kafka服务
  • 准备好测试用的Kafka主题

可从Confluent官网下载完整环境。

3. 配置KafkaStreams输入

3.1 定义输入主题

首先创建输入主题(这里使用Confluent工具包):

./confluent start

3.2 基础配置

设置应用ID和Kafka服务器地址:

String inputTopic = "inputTopic";

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
  StreamsConfig.APPLICATION_ID_CONFIG, 
  "wordcount-live-test");

// 关键配置:Kafka集群地址
private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
  StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
  bootstrapServers);

3.3 序列化配置

指定消息键值类型的序列化方式:

streamsConfiguration.put(
  StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
streamsConfiguration.put(
  StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());

3.4 状态存储配置

流处理通常是有状态的,需要配置状态存储目录:

this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(
  StreamsConfig.STATE_DIR_CONFIG, 
  this.stateDirectory.toAbsolutePath().toString());

4. 构建流处理拓扑

核心步骤:

  1. 创建流构建器
  2. 定义数据流处理逻辑
  3. 实现单词计数算法
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
  .groupBy((key, word) -> word)
  .count();

处理逻辑解析:

  1. 分词:使用正则表达式拆分句子
  2. 扁平化flatMapValues() 将数组转换为独立单词流
  3. 分组:按单词分组
  4. 计数count() 聚合计算单词出现次数

5. 处理结果输出

5.1 控制台输出(开发调试)

wordCounts.toStream()
  .foreach((word, count) -> System.out.println("word: " + word + " -> " + count));

5.2 生产环境输出到Kafka主题

String outputTopic = "outputTopic";
wordCounts.toStream()
  .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

关键点说明:

  • Serdes 提供预置的序列化器
  • 键类型:String(单词)
  • 值类型:Long(计数)
  • 结果将持久化到outputTopic主题

6. 启动流处理任务

6.1 任务启动代码

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();

// 测试环境等待30秒
Thread.sleep(30000);
streams.close();

⚠️ 生产环境应保持持续运行,此处仅作演示。

6.2 测试验证

启动控制台生产者发送测试数据:

./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"

预期输出结果:

word:  -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word:  -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2

结果分析:

  • 首条消息中"pony"计数为1
  • 第二条消息后"pony"计数变为2
  • 空字符串(分隔符产生)也被统计

7. 总结

本文展示了如何使用KafkaStreams构建基础流处理应用,核心要点:

  1. 配置要点

    • 应用ID唯一标识
    • 序列化器匹配数据类型
    • 状态目录配置
  2. 处理流程

    • 消费 → 转换 → 聚合 → 输出
    • 简单粗暴的单词计数实现
  3. 生产建议

    • 结果输出到Kafka主题而非控制台
    • 持续运行而非定时关闭

完整代码示例可在GitHub项目获取,这是一个开箱即用的Maven项目。


原始标题:Introduction to KafkaStreams in Java

« 上一篇: Java Cipher 类详解
» 下一篇: Java Weekly, 第209期