2. 概述
Kafka Streams在Kafka主题和关系型数据库表之间建立了双重映射,支持对事件流进行连接、分组、聚合和过滤等操作。其核心概念是处理器拓扑——这本质上是一个有向无环图,定义了事件流处理的蓝图:
- 节点类型:
- 源节点:从Kafka接收流数据
- 处理器节点:执行自定义操作
- 汇节点:将结果输出到新Kafka主题
- 容错机制:通过定期保存检查点实现状态持久化
3. 依赖配置
在POM文件中添加核心依赖(版本号根据实际项目调整):
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.1</version>
</dependency>
4. 实战案例
我们将构建一个单词计数流处理应用,包含:
- 从输入主题读取消息
- 处理文本并统计词频
- 输出到Kafka主题
- 提供REST接口查询结果
4.1 核心配置
通过Java配置类初始化Kafka Streams:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "streams-app");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
}
关键点:
@EnableKafkaStreams
注解自动配置组件- 必须提供名为
DEFAULT_STREAMS_CONFIG_BEAN_NAME
的KafkaStreamsConfiguration
Bean - Spring Boot会自动管理
KafkaStreams
客户端生命周期
4.2 处理拓扑构建
使用StreamsBuilder
声明式定义处理流程:
@Component
public class WordCountProcessor {
private static final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));
KTable<String, Long> wordCounts = messageStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
.count();
wordCounts.toStream().to("output-topic");
}
}
处理流程拆解:
- 从
input-topic
创建KStream
- 文本转换为小写
- 按非字母字符拆分单词
- 按单词分组计数
- 结果输出到
output-topic
踩坑提示:Spring对Kafka Streams的封装非常轻量,主要帮我们管理生命周期,业务逻辑仍需自行实现。
4.3 REST服务集成
通过状态存储实现实时查询接口:
// 修改拓扑定义,添加状态存储
KTable<String, Long> wordCounts = textStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
.count(Materialized.as("counts"));
// REST控制器实现
@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
);
return counts.get(word);
}
关键设计:
- 使用
Materialized.as("counts")
创建本地状态存储 - 通过
StreamsBuilderFactoryBean
获取KafkaStreams
实例 - 直接查询状态存储而非输出主题(性能更优)
5. 测试策略
5.1 单元测试
使用TopologyTestDriver
进行无Broker测试:
@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
wordCountProcessor.buildPipeline(streamsBuilder);
Topology topology = streamsBuilder.build();
try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
TestInputTopic<String, String> inputTopic = topologyTestDriver
.createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
TestOutputTopic<String, Long> outputTopic = topologyTestDriver
.createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());
inputTopic.pipeInput("key", "hello world");
inputTopic.pipeInput("key2", "hello");
assertThat(outputTopic.readKeyValuesToList())
.containsExactly(
KeyValue.pair("hello", 1L),
KeyValue.pair("world", 1L),
KeyValue.pair("hello", 2L)
);
}
}
优势:
- 无需启动Kafka Broker
- 快速验证拓扑逻辑
- 完整模拟输入输出流
5.2 集成测试
使用Testcontainers进行端到端验证:
@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {
@Container
private static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
private final BlockingQueue<String> output = new LinkedBlockingQueue<>();
@Test
void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
postMessage("test message");
startOutputTopicConsumer();
// 验证输出主题
assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");
// 验证REST接口
assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
}
}
测试覆盖:
- REST接口消息发送
- Kafka处理流程验证
- 状态存储查询准确性
6. 总结
本文展示了在Spring Boot中构建Kafka Streams应用的完整流程:
- 核心概念:理解处理器拓扑和状态存储机制
- 配置要点:通过
@EnableKafkaStreams
和配置Bean快速启动 - 开发实践:声明式拓扑构建与REST服务集成
- 测试策略:
- 单元测试:使用TopologyTestDriver快速验证逻辑
- 集成测试:Testcontainers模拟真实环境
最佳实践:
- 优先使用状态存储实现查询接口
- 单元测试覆盖核心拓扑逻辑
- 集成测试验证端到端流程
完整源码可参考GitHub仓库。