2. 概述

Kafka Streams在Kafka主题和关系型数据库表之间建立了双重映射,支持对事件流进行连接、分组、聚合和过滤等操作。其核心概念是处理器拓扑——这本质上是一个有向无环图,定义了事件流处理的蓝图:

  • 节点类型
    • 源节点:从Kafka接收流数据
    • 处理器节点:执行自定义操作
    • 汇节点:将结果输出到新Kafka主题
  • 容错机制:通过定期保存检查点实现状态持久化

3. 依赖配置

在POM文件中添加核心依赖(版本号根据实际项目调整):

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.6.1</version>
</dependency> 

4. 实战案例

我们将构建一个单词计数流处理应用,包含:

  1. 从输入主题读取消息
  2. 处理文本并统计词频
  3. 输出到Kafka主题
  4. 提供REST接口查询结果

4.1 核心配置

通过Java配置类初始化Kafka Streams:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "streams-app");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }
}

关键点:

  • @EnableKafkaStreams注解自动配置组件
  • 必须提供名为DEFAULT_STREAMS_CONFIG_BEAN_NAMEKafkaStreamsConfiguration Bean
  • Spring Boot会自动管理KafkaStreams客户端生命周期

4.2 处理拓扑构建

使用StreamsBuilder声明式定义处理流程:

@Component
public class WordCountProcessor {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
          .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

        KTable<String, Long> wordCounts = messageStream
          .mapValues((ValueMapper<String, String>) String::toLowerCase)
          .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
          .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
          .count();

        wordCounts.toStream().to("output-topic");
    }
}

处理流程拆解:

  1. input-topic创建KStream
  2. 文本转换为小写
  3. 按非字母字符拆分单词
  4. 按单词分组计数
  5. 结果输出到output-topic

踩坑提示:Spring对Kafka Streams的封装非常轻量,主要帮我们管理生命周期,业务逻辑仍需自行实现。

4.3 REST服务集成

通过状态存储实现实时查询接口:

// 修改拓扑定义,添加状态存储
KTable<String, Long> wordCounts = textStream
  .mapValues((ValueMapper<String, String>) String::toLowerCase)
  .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  .groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
  .count(Materialized.as("counts"));

// REST控制器实现
@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
    KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
    ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
      StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
    );
    return counts.get(word);
}

关键设计:

  • 使用Materialized.as("counts")创建本地状态存储
  • 通过StreamsBuilderFactoryBean获取KafkaStreams实例
  • 直接查询状态存储而非输出主题(性能更优)

5. 测试策略

5.1 单元测试

使用TopologyTestDriver进行无Broker测试:

@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    wordCountProcessor.buildPipeline(streamsBuilder);
    Topology topology = streamsBuilder.build();

    try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
        TestInputTopic<String, String> inputTopic = topologyTestDriver
          .createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
        
        TestOutputTopic<String, Long> outputTopic = topologyTestDriver
          .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());

        inputTopic.pipeInput("key", "hello world");
        inputTopic.pipeInput("key2", "hello");

        assertThat(outputTopic.readKeyValuesToList())
          .containsExactly(
            KeyValue.pair("hello", 1L),
            KeyValue.pair("world", 1L),
            KeyValue.pair("hello", 2L)
          );
    }
}

优势:

  • 无需启动Kafka Broker
  • 快速验证拓扑逻辑
  • 完整模拟输入输出流

5.2 集成测试

使用Testcontainers进行端到端验证:

@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    private final BlockingQueue<String> output = new LinkedBlockingQueue<>();

    @Test
    void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
        postMessage("test message");

        startOutputTopicConsumer();

        // 验证输出主题
        assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
        assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");

        // 验证REST接口
        assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
        assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
    }
}

测试覆盖:

  • REST接口消息发送
  • Kafka处理流程验证
  • 状态存储查询准确性

6. 总结

本文展示了在Spring Boot中构建Kafka Streams应用的完整流程:

  1. 核心概念:理解处理器拓扑和状态存储机制
  2. 配置要点:通过@EnableKafkaStreams和配置Bean快速启动
  3. 开发实践:声明式拓扑构建与REST服务集成
  4. 测试策略
    • 单元测试:使用TopologyTestDriver快速验证逻辑
    • 集成测试:Testcontainers模拟真实环境

最佳实践

  • 优先使用状态存储实现查询接口
  • 单元测试覆盖核心拓扑逻辑
  • 集成测试验证端到端流程

完整源码可参考GitHub仓库


原始标题:Kafka Streams With Spring Boot