1. 简介

Apache Kafka 是一个强大的、分布式的、具备容错能力的流处理系统。在之前的文章中,我们学习了 如何在 Spring 中使用 Kafka

本文将继续深入,教你如何编写可靠、自包含的集成测试,无需依赖外部运行的 Kafka 服务。

我们会先了解如何使用和配置嵌入式 Kafka 实例。

然后,再介绍如何在测试中使用流行的 Testcontainers 框架。

2. 依赖配置

当然,我们需要在 pom.xml 中添加标准的 spring-kafka 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

此外,还需要两个专用于测试的依赖。

首先添加 spring-kafka-test 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>3.1.1</version>
    <scope>test</scope>
</dependency>

最后添加 Testcontainers 提供的 Kafka 依赖,该依赖同样可以在 Maven Central 找到:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>

配置好所有依赖后,我们就可以开始构建一个基于 Spring Boot 和 Kafka 的简单应用了。

3. 一个简单的 Kafka 生产者-消费者应用

本教程中,测试的重点是一个简单的 Spring Boot Kafka 生产者-消费者应用。

先从应用入口类开始:

@SpringBootApplication
public class KafkaProducerConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerConsumerApplication.class, args);
    }
}

这是一个标准的 Spring Boot 应用。

3.1. 生产者配置

接下来,定义一个用于发送消息到 Kafka 指定主题的生产者 Bean:

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

上面的 KafkaProducer Bean 实际上是对 KafkaTemplate 的简单封装。该类提供了线程安全的高级操作,比如向指定主题发送数据,这正是我们在 send 方法中做的事情。

3.2. 消费者配置

同样地,我们定义一个简单的消费者 Bean,用于监听 Kafka 主题并接收消息:

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
        latch.countDown();
    }

    public void resetLatch() {
        latch = new CountDownLatch(1);
    }

    // 其他 getter 方法
}

这个消费者通过 @KafkaListener 注解监听指定主题的消息。后续我们会看到如何从测试中配置 test.topic

此外,receive 方法会将消息内容保存到 Bean 中,并减少 latch 计数器的值。这个变量是一个线程安全的计数器,后续在测试中用于确保消息被成功接收。

现在我们已经实现了基于 Spring Boot 的简单 Kafka 应用,接下来学习如何编写集成测试。

4. 关于测试的一点说明

在编写干净的集成测试时,我们应尽量避免依赖外部服务。 因为这些服务可能不受控制,或者突然不可用,从而影响测试结果。

❌ 如果依赖外部 Kafka 服务,我们很难完全控制其启动、配置和关闭流程。

4.1. 应用属性配置

测试中使用一组轻量级的应用配置属性。

我们将这些属性定义在 src/test/resources/application.yml 文件中:

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
test:
  topic: embedded-test-topic

这是使用嵌入式 Kafka 或本地 Kafka 实例所需的最小配置。

其中,**最重要的配置是 auto-offset-reset: earliest**,它确保消费者组能够接收到我们发送的消息,即使消费者在消息发送之后才启动。

另外,我们配置了一个测试主题 embedded-test-topic,用于后续测试中发送和接收消息。

5. 使用嵌入式 Kafka 进行测试

本节介绍如何使用内存中的 Kafka 实例(即 Embedded Kafka)来运行测试。

之前添加的 spring-kafka-test 依赖中包含了一些测试工具类,其中最重要的就是 EmbeddedKafkaBroker 类。

基于此,我们编写第一个集成测试:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() 
      throws Exception {
        String data = "Sending with our own simple KafkaProducer";
        
        producer.send(topic, data);
        
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        assertTrue(messageConsumed);
        assertThat(consumer.getPayload(), containsString(data));
    }
}

测试关键点说明:

  • 使用 @SpringBootTest 启动 Spring 上下文。
  • 使用 @DirtiesContext 确保每次测试后上下文被重置。
  • 最关键的 @EmbeddedKafka 注解会注入一个嵌入式 Kafka Broker 实例。
    • partitions = 1:每个主题使用一个分区。
    • brokerProperties:配置 Kafka Broker 的监听地址和端口。

随后,我们注入消费者和生产者,并从配置中获取测试主题。

最后,发送消息并验证是否被成功接收。

运行测试时,你会看到类似如下日志:

...
12:45:35.099 [main] INFO  c.b.kafka.embedded.KafkaProducer -
  sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
  INFO  c.b.kafka.embedded.KafkaConsumer - received payload=
  'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
  CreateTime = 1605267935099, serialized key size = -1, 
  serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
  key = null, value = Sending with our own simple KafkaProducer key)'

✅ 成功!我们已经实现了完全自包含、不依赖外部服务的 Kafka 集成测试。

6. 使用 Testcontainers 进行 Kafka 测试

有时候,嵌入式 Kafka 可能与真实环境存在细微差异。⚠️ 虽然少见,但端口冲突也可能导致测试失败。

为避免这些问题,我们可以使用 Testcontainers 框架,在 Docker 容器中运行真实的 Kafka 实例进行测试。

来看一个类似的测试示例:

@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

    @ClassRule
    public static KafkaContainer kafka = 
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived() 
      throws Exception {
        String data = "Sending with our own simple KafkaProducer";
        
        producer.send(topic, data);
     
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        
        assertTrue(messageConsumed);
        assertThat(consumer.getPayload(), containsString(data));
    }
}

主要差异点:

  • 使用 JUnit 的 @ClassRule 定义一个 KafkaContainer 实例,用于启动 Kafka 容器。
  • Testcontainers 会自动分配端口,避免冲突。
  • 通过自定义配置类 KafkaTestContainersConfiguration 动态注入 Kafka 服务地址。

配置示例:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
    // 其他标准配置
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    // 其他标准配置
    return new DefaultKafkaProducerFactory<>(configProps);
}

通过调用 kafka.getBootstrapServers() 获取动态分配的地址,例如:

bootstrap.servers = [PLAINTEXT://localhost:32789]

运行测试时,Testcontainers 会执行以下操作:

  • 检查本地 Docker 环境
  • 拉取 confluentinc/cp-kafka:5.4.3 镜像(如未存在)
  • 启动容器并等待就绪
  • 测试结束后自动关闭并删除容器

日志示例:

13:33:10.396 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

✅ 完美!我们已经实现了一个使用真实 Kafka Docker 容器的集成测试。

7. 总结

本文介绍了两种在 Spring Boot 中测试 Kafka 应用的方法:

  1. 使用嵌入式 Kafka 实例进行轻量级测试。
  2. 使用 Testcontainers 启动真实的 Kafka 容器进行更贴近生产环境的测试。

完整代码见 GitHub 仓库


原始标题:Testing Kafka and Spring Boot