1. 概述
本教程将教你如何使用Kafka的Consumer API创建消息监听器,并从主题中消费消息。随后我们将使用Producer API和Testcontainers测试实现。
重点: 我们将专注于直接使用KafkaConsumer,不依赖Spring Boot模块。
2. 创建自定义Kafka监听器
自定义监听器内部将使用*kafka-clients*库的Producer和Consumer API。 首先在pom.xml中添加依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
我们将创建CustomKafkaListener类监听"baeldung.articles.published"主题。该类内部封装KafkaConsumer并订阅主题:
class CustomKafkaListener {
private final String topic;
private final KafkaConsumer<String, String> consumer;
// ...
}
2.1. 创建KafkaConsumer
创建KafkaConsumer需要通过Properties对象提供有效配置。 我们先创建一个默认的消费者配置:
public CustomKafkaListener(String topic, String bootstrapServers) {
this(topic, defaultKafkaConsumer(bootstrapServers));
}
static KafkaConsumer<String, String> defaultKafkaConsumer(String boostrapServers) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group_id");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
⚠️ 关键配置说明:
- Bootstrap Servers:用于建立Kafka集群初始连接的主机端口对列表
- Group ID:允许消费者组共同消费主题分区的ID
- Auto Offset Reset:无初始偏移量时开始读取数据的日志位置
- Key/Value Deserializers:键值反序列化类(示例使用String类型)
完整配置列表参考官方文档
2.2. 订阅主题
*使用KafkaConsumer的subscribe()方法订阅主题,并通过无限循环调用poll()方法获取消息。* 由于该方法会阻塞线程,我们实现Runnable接口以便与CompletableFuture集成:
class CustomKafkaListener implements Runnable {
private final String topic;
private final KafkaConsumer<String, String> consumer;
// constructors
@Override
void run() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
consumer.poll(Duration.ofMillis(100))
.forEach(record -> log.info("received: " + record));
}
}
}
✅ 启动监听器(非阻塞方式):
String topic = "baeldung.articles.published";
String bootstrapServers = "localhost:9092";
var listener = new CustomKafkaListener(topic, bootstrapServers)
CompletableFuture.runAsync(listener);
2.3. 消费活动处理
当前实现仅记录消息。我们增强其功能以支持复杂场景和测试:*允许定义Consumer
class CustomKafkaListener implements Runnable {
private final String topic;
private final KafkaConsumer<String, String> consumer;
private Consumer<String> recordConsumer;
CustomKafkaListener(String topic, KafkaConsumer<String, String> consumer) {
this.topic = topic;
this.consumer = consumer;
this.recordConsumer = record -> log.info("received: " + record);
}
// ...
@Override
public void run() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
consumer.poll(Duration.ofMillis(100))
.forEach(record -> recordConsumer.accept(record.value()));
}
}
CustomKafkaListener onEach(Consumer newConsumer) {
recordConsumer = recordConsumer.andThen(newConsumer);
return this;
}
}
🔗 链式处理: 使用*andThen()*方法可串联多个处理函数,每条消息将依次执行所有函数。
3. 测试实现
我们将创建KafkaProducer向主题发布消息,启动CustomKafkaListener并验证处理结果。
3.1. 配置Kafka Testcontainer
使用Testcontainers库在测试环境启动Kafka容器。 添加依赖:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
创建KafkaContainer并使用JUnit5扩展管理生命周期:
@Testcontainers
class CustomKafkaListenerLiveTest {
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
// ...
}
3.2. 创建并启动监听器
定义主题名和bootstrap服务器,创建消息收集列表:
String topic = "baeldung.articles.published";
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
List<String> consumedMessages = new ArrayList<>();
创建监听器实例并配置消息收集:
CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
❌ 踩坑警告: 直接调用run()会阻塞线程!使用CompletableFuture异步执行:
CompletableFuture.runAsync(listener);
✅ 最佳实践: 结合try-with-resources管理资源:
var listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener);
3.3. 发布消息
创建KafkaProducer向主题发送文章名称:
static KafkaProducer<String, String> testKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
创建测试辅助方法批量发送消息:
private void publishArticles(String topic, String... articles) {
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
Arrays.stream(articles)
.map(article -> new ProducerRecord<String,String>(topic, article))
.forEach(producer::send);
}
}
3.4. 验证结果
整合所有组件进行测试:
@Test
void givenANewCustomKafkaListener_thenConsumesAllMessages() {
// given
String topic = "baeldung.articles.published";
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
List<String> consumedMessages = new ArrayList<>();
// when
CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener);
// and
publishArticles(topic,
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
);
// then
// ...
}
使用Awaitility等待异步处理完成并验证结果:
// then
await().untilAsserted(() ->
assertThat(consumedMessages).containsExactlyInAnyOrder(
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
));
4. 总结
本教程展示了如何在不依赖高级Spring模块的情况下,直接使用Kafka的Consumer和Producer API:
- 创建封装KafkaConsumer的CustomKafkaListener
- 使用Testcontainers和Awaitility进行测试验证
完整示例代码请访问GitHub仓库。