1. 概述

Apache Kafka 是一款流行的分布式事件流平台,当结合 Project Reactor 使用时,能构建出高弹性的响应式应用。Reactor Kafka 是基于 Reactor 和 Kafka Producer/Consumer API 构建的响应式 API。

Reactor Kafka API 允许我们通过函数式、非阻塞且支持背压的 API 向 Kafka 发布消息和消费消息。这意味着系统能根据当前负载和资源动态调整消息处理速率,确保高效且容错的运行。

本文将带你探索如何使用 Reactor Kafka 创建 Kafka 消费者,重点实现容错性和可靠性。我们将深入剖析背压、重试机制和错误处理等核心概念,同时展示如何以非阻塞方式异步处理消息。

2. 项目配置

首先,我们需要在项目中添加 Spring KafkaReactor Kafka 的 Maven 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
</dependency>

3. 响应式 Kafka 消费者搭建

接下来,我们将使用 Reactor Kafka 搭建 Kafka 消费者。首先配置必要的消费者属性,确保正确连接 Kafka;然后初始化消费者;最后展示如何响应式地消费消息。

3.1. 配置 Kafka 消费者属性

现在配置响应式 Kafka 消费者属性。**KafkaConfig 配置类定义了消费者所需的属性**:

public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public static Map<String, Object> consumerConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return config;
    }
}

ConsumerConfig.GROUP_ID_CONFIG 定义了消费者组,实现消息在消费者间的负载均衡。同一组内的所有消费者共同负责处理某个主题的消息。

接着,在实例化 ReactiveKafkaConsumerTemplate 时使用该配置类来消费事件:

public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate() {
    return new ReactiveKafkaConsumerTemplate<>(receiverOptions());
}

private ReceiverOptions<String, String> receiverOptions() {
    Map<String, Object> consumerConfig = consumerConfig();
    ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerConfig);
    return receiverOptions.subscription(Collections.singletonList("test-topic"));
}

receiverOptions() 方法使用 consumerConfig() 的配置设置 Kafka 消费者,并订阅 test-topic 确保监听消息。reactiveKafkaConsumerTemplate() 方法初始化 ReactiveKafkaConsumerTemplate,为我们的响应式应用启用非阻塞、背压感知的消息消费。

3.2. 使用 Reactive Kafka 创建消费者

在 Reactor Kafka 中,Kafka 消费者的首选抽象是一个入站 Flux,所有从 Kafka 接收到的事件都由框架发布到该流中。这个 Flux 通过调用 ReactiveKafkaConsumerTemplatereceive()receiveAtmostOnce()receiveAutoAck()receiveExactlyOnce() 方法创建。

本例中,我们使用 receive() 操作符消费入站 Flux

public class ConsumerService {

    private final ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;

    public Flux<String> consumeRecord() {
        return reactiveKafkaConsumerTemplate.receive()
          .map(ReceiverRecord::value)
          .doOnNext(msg -> log.info("Received: {}", msg))
          .doOnError(error -> log.error("Consumer error: {}", error.getMessage()));
    }
}

这种方式允许系统在消息到达时响应式处理,无需阻塞或丢失消息。通过响应式流,消费者可以按自身节奏扩展处理消息,必要时应用背压。这里我们通过 doOnNext() 记录每条消息,并用 doOnError() 记录错误。

4. 背压处理

使用 Reactor Kafka 消费者的主要优势之一是支持背压。这确保系统在高吞吐量下不会被压垮。我们可以通过 limitRate() 限制处理速率,或使用 buffer() 进行批处理,而不是直接消费消息:

public Flux<String> consumeWithLimit() {
    return reactiveKafkaConsumerTemplate.receive()
      .limitRate(2)
      .map(ReceiverRecord::value);
}

这里我们每次最多请求 2 条消息,控制流量。这种方式确保高效且背压感知的消息处理,最终只提取并返回消息值。

也可以将消息作为批次消费,在发出前缓冲固定数量的记录:

public Flux<String> consumeAsABatch() {
    return reactiveKafkaConsumerTemplate.receive()
      .buffer(2)
      .flatMap(messages -> Flux.fromStream(messages.stream()
        .map(ReceiverRecord::value)));
}

这里使用 buffer(2) 将最多 2 条记录分组后作为批次发出。通过分组消息共同处理,减少了单独处理的开销。

5. 错误处理策略

在响应式 Kafka 消费者中,管道中的错误会作为终止信号。这会导致消费者关闭,使服务实例无法继续消费事件。Reactor Kafka 提供多种策略应对此问题,例如使用 retryWhen 操作符实现重试机制:捕获故障、重新订阅上游发布者并重建 Kafka 消费者。

另一个常见问题是反序列化错误,当消费者因意外格式无法反序列化消息时发生。要处理这类错误,**可以使用 Spring Kafka 提供的 ErrorHandlingDeserializer**。

5.1. 重试策略

重试策略在需要重试失败操作时至关重要。该策略确保以固定延迟(如 5 秒)持续重试,直到消费者成功重连或满足预定义退出条件

为消费者实现重试策略,使其在出错时自动重试消息处理:

public Flux<String> consumeWithRetryWithBackOff(AtomicInteger attempts) {
    return reactiveKafkaConsumerTemplate.receive()
      .flatMap(msg -> attempts.incrementAndGet() < 3 ? 
        Flux.error(new RuntimeException("Failure")) : Flux.just(msg))
      .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)))
      .map(ReceiverRecord::value);
}

本例中,Retry.backoff(3, Duration.ofSeconds(1)) 指定系统最多重试 3 次,每次间隔 1 秒。

5.2. 使用 ErrorHandlingDeserializer 处理反序列化错误

从 Kafka 消费消息时,若消息格式与预期架构不匹配,会遇到反序列化错误。要处理此问题,**可以使用 Spring Kafka 的 ErrorHandlingDeserializer**。它能捕获反序列化错误,防止消费者失败,并将错误详情作为头部添加到 ReceiverRecord,而非丢弃消息或抛出异常:

private Map<String, Object> errorHandlingConsumerConfig(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
    return config;
}

6. 总结

本文探讨了如何使用 Reactor Kafka 创建 Kafka 消费者,重点介绍了错误处理、重试机制和背压管理。这些技术使我们的 Kafka 消费者即使在故障场景下也能保持容错性和高效性。


原始标题:Create Kafka Consumers With Reactor Kafka | Baeldung