1. 概述

本教程将教你如何使用Kafka的Consumer API创建消息监听器,并从主题中消费消息。随后我们将使用Producer APITestcontainers测试实现。

重点: 我们将专注于直接使用KafkaConsumer,不依赖Spring Boot模块。

2. 创建自定义Kafka监听器

自定义监听器内部将使用*kafka-clients*库的Producer和Consumer API。 首先在pom.xml中添加依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

我们将创建CustomKafkaListener类监听"baeldung.articles.published"主题。该类内部封装KafkaConsumer并订阅主题:

class CustomKafkaListener {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;

    // ...
}

2.1. 创建KafkaConsumer

创建KafkaConsumer需要通过Properties对象提供有效配置。 我们先创建一个默认的消费者配置:

public CustomKafkaListener(String topic, String bootstrapServers) {
    this(topic, defaultKafkaConsumer(bootstrapServers));
}

static KafkaConsumer<String, String> defaultKafkaConsumer(String boostrapServers) {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group_id");
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(props);
}

⚠️ 关键配置说明:

  • Bootstrap Servers:用于建立Kafka集群初始连接的主机端口对列表
  • Group ID:允许消费者组共同消费主题分区的ID
  • Auto Offset Reset:无初始偏移量时开始读取数据的日志位置
  • Key/Value Deserializers:键值反序列化类(示例使用String类型)

完整配置列表参考官方文档

2.2. 订阅主题

*使用KafkaConsumersubscribe()方法订阅主题,并通过无限循环调用poll()方法获取消息。* 由于该方法会阻塞线程,我们实现Runnable接口以便与CompletableFuture集成:

class CustomKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;

    // constructors

    @Override
    void run() {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            consumer.poll(Duration.ofMillis(100))
              .forEach(record -> log.info("received: " + record)); 
        } 
    }
}

启动监听器(非阻塞方式):

String topic = "baeldung.articles.published";
String bootstrapServers = "localhost:9092";

var listener = new CustomKafkaListener(topic, bootstrapServers)
CompletableFuture.runAsync(listener);

2.3. 消费活动处理

当前实现仅记录消息。我们增强其功能以支持复杂场景和测试:*允许定义Consumer处理每个新事件:*

class CustomKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;
    private Consumer<String> recordConsumer;

    CustomKafkaListener(String topic, KafkaConsumer<String, String> consumer) {
        this.topic = topic;
        this.consumer = consumer;
        this.recordConsumer = record -> log.info("received: " + record);
    }

   // ...

    @Override
    public void run() {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            consumer.poll(Duration.ofMillis(100))
              .forEach(record -> recordConsumer.accept(record.value()));
        }
    }

    CustomKafkaListener onEach(Consumer newConsumer) {
        recordConsumer = recordConsumer.andThen(newConsumer);
        return this;
    }
}

🔗 链式处理: 使用*andThen()*方法可串联多个处理函数,每条消息将依次执行所有函数。

3. 测试实现

我们将创建KafkaProducer向主题发布消息,启动CustomKafkaListener并验证处理结果。

3.1. 配置Kafka Testcontainer

使用Testcontainers库在测试环境启动Kafka容器。 添加依赖:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>

创建KafkaContainer并使用JUnit5扩展管理生命周期:

@Testcontainers
class CustomKafkaListenerLiveTest {

    @Container
    private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    // ...
}

3.2. 创建并启动监听器

定义主题名和bootstrap服务器,创建消息收集列表:

String topic = "baeldung.articles.published";
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
List<String> consumedMessages = new ArrayList<>();

创建监听器实例并配置消息收集:

CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);

踩坑警告: 直接调用run()会阻塞线程!使用CompletableFuture异步执行:

CompletableFuture.runAsync(listener);

最佳实践: 结合try-with-resources管理资源:

var listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener);

3.3. 发布消息

创建KafkaProducer向主题发送文章名称:

static KafkaProducer<String, String> testKafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return new KafkaProducer<>(props);
}

创建测试辅助方法批量发送消息:

private void publishArticles(String topic, String... articles) {
    try (KafkaProducer<String, String> producer = testKafkaProducer()) {
        Arrays.stream(articles)
          .map(article -> new ProducerRecord<String,String>(topic, article))
          .forEach(producer::send);
    }
}

3.4. 验证结果

整合所有组件进行测试:

@Test
void givenANewCustomKafkaListener_thenConsumesAllMessages() {
    // given
    String topic = "baeldung.articles.published";
    String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
    List<String> consumedMessages = new ArrayList<>();

    // when
    CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
    CompletableFuture.runAsync(listener);
    
    // and
    publishArticles(topic,
      "Introduction to Kafka",
      "Kotlin for Java Developers",
      "Reactive Spring Boot",
      "Deploying Spring Boot Applications",
      "Spring Security"
    );

    // then
    // ...
}

使用Awaitility等待异步处理完成并验证结果:

// then
await().untilAsserted(() -> 
  assertThat(consumedMessages).containsExactlyInAnyOrder(
    "Introduction to Kafka",
    "Kotlin for Java Developers",
    "Reactive Spring Boot",
    "Deploying Spring Boot Applications",
    "Spring Security"
  ));

4. 总结

本教程展示了如何在不依赖高级Spring模块的情况下,直接使用Kafka的ConsumerProducer API

  1. 创建封装KafkaConsumerCustomKafkaListener
  2. 使用Testcontainers和Awaitility进行测试验证

完整示例代码请访问GitHub仓库


原始标题:Creating a Kafka Listener Using the Consumer API | Baeldung