1. 简介
Apache Kafka 是一个强大的、分布式的、具备容错能力的流处理系统。在之前的文章中,我们学习了 如何在 Spring 中使用 Kafka。
本文将继续深入,教你如何编写可靠、自包含的集成测试,无需依赖外部运行的 Kafka 服务。
我们会先了解如何使用和配置嵌入式 Kafka 实例。
然后,再介绍如何在测试中使用流行的 Testcontainers 框架。
2. 依赖配置
当然,我们需要在 pom.xml 中添加标准的 spring-kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
此外,还需要两个专用于测试的依赖。
首先添加 spring-kafka-test 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>3.1.1</version>
<scope>test</scope>
</dependency>
最后添加 Testcontainers 提供的 Kafka 依赖,该依赖同样可以在 Maven Central 找到:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
配置好所有依赖后,我们就可以开始构建一个基于 Spring Boot 和 Kafka 的简单应用了。
3. 一个简单的 Kafka 生产者-消费者应用
本教程中,测试的重点是一个简单的 Spring Boot Kafka 生产者-消费者应用。
先从应用入口类开始:
@SpringBootApplication
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}
这是一个标准的 Spring Boot 应用。
3.1. 生产者配置
接下来,定义一个用于发送消息到 Kafka 指定主题的生产者 Bean:
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
上面的 KafkaProducer Bean 实际上是对 KafkaTemplate 的简单封装。该类提供了线程安全的高级操作,比如向指定主题发送数据,这正是我们在 send 方法中做的事情。
3.2. 消费者配置
同样地,我们定义一个简单的消费者 Bean,用于监听 Kafka 主题并接收消息:
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown();
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
// 其他 getter 方法
}
这个消费者通过 @KafkaListener 注解监听指定主题的消息。后续我们会看到如何从测试中配置 test.topic。
此外,receive 方法会将消息内容保存到 Bean 中,并减少 latch 计数器的值。这个变量是一个线程安全的计数器,后续在测试中用于确保消息被成功接收。
现在我们已经实现了基于 Spring Boot 的简单 Kafka 应用,接下来学习如何编写集成测试。
4. 关于测试的一点说明
✅ 在编写干净的集成测试时,我们应尽量避免依赖外部服务。 因为这些服务可能不受控制,或者突然不可用,从而影响测试结果。
❌ 如果依赖外部 Kafka 服务,我们很难完全控制其启动、配置和关闭流程。
4.1. 应用属性配置
测试中使用一组轻量级的应用配置属性。
我们将这些属性定义在 src/test/resources/application.yml 文件中:
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic
这是使用嵌入式 Kafka 或本地 Kafka 实例所需的最小配置。
其中,**最重要的配置是 auto-offset-reset: earliest
**,它确保消费者组能够接收到我们发送的消息,即使消费者在消息发送之后才启动。
另外,我们配置了一个测试主题 embedded-test-topic
,用于后续测试中发送和接收消息。
5. 使用嵌入式 Kafka 进行测试
本节介绍如何使用内存中的 Kafka 实例(即 Embedded Kafka)来运行测试。
之前添加的 spring-kafka-test 依赖中包含了一些测试工具类,其中最重要的就是 EmbeddedKafkaBroker 类。
基于此,我们编写第一个集成测试:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived()
throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
}
测试关键点说明:
- 使用 @SpringBootTest 启动 Spring 上下文。
- 使用 @DirtiesContext 确保每次测试后上下文被重置。
- 最关键的 @EmbeddedKafka 注解会注入一个嵌入式 Kafka Broker 实例。
partitions = 1
:每个主题使用一个分区。brokerProperties
:配置 Kafka Broker 的监听地址和端口。
随后,我们注入消费者和生产者,并从配置中获取测试主题。
最后,发送消息并验证是否被成功接收。
运行测试时,你会看到类似如下日志:
...
12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer -
sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
INFO c.b.kafka.embedded.KafkaConsumer - received payload=
'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
CreateTime = 1605267935099, serialized key size = -1,
serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
key = null, value = Sending with our own simple KafkaProducer key)'
✅ 成功!我们已经实现了完全自包含、不依赖外部服务的 Kafka 集成测试。
6. 使用 Testcontainers 进行 Kafka 测试
有时候,嵌入式 Kafka 可能与真实环境存在细微差异。⚠️ 虽然少见,但端口冲突也可能导致测试失败。
为避免这些问题,我们可以使用 Testcontainers 框架,在 Docker 容器中运行真实的 Kafka 实例进行测试。
来看一个类似的测试示例:
@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived()
throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
}
主要差异点:
- 使用 JUnit 的 @ClassRule 定义一个 KafkaContainer 实例,用于启动 Kafka 容器。
- Testcontainers 会自动分配端口,避免冲突。
- 通过自定义配置类 KafkaTestContainersConfiguration 动态注入 Kafka 服务地址。
配置示例:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
// 其他标准配置
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
// 其他标准配置
return new DefaultKafkaProducerFactory<>(configProps);
}
通过调用 kafka.getBootstrapServers()
获取动态分配的地址,例如:
bootstrap.servers = [PLAINTEXT://localhost:32789]
运行测试时,Testcontainers 会执行以下操作:
- 检查本地 Docker 环境
- 拉取 confluentinc/cp-kafka:5.4.3 镜像(如未存在)
- 启动容器并等待就绪
- 测试结束后自动关闭并删除容器
日志示例:
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
✅ 完美!我们已经实现了一个使用真实 Kafka Docker 容器的集成测试。
7. 总结
本文介绍了两种在 Spring Boot 中测试 Kafka 应用的方法:
- 使用嵌入式 Kafka 实例进行轻量级测试。
- 使用 Testcontainers 启动真实的 Kafka 容器进行更贴近生产环境的测试。
完整代码见 GitHub 仓库。