1. 概述

Apache Kafka 已成为构建事件驱动架构最流行的消息系统之一,在这种架构中,一个微服务将消息发布到主题,另一个微服务异步消费并处理这些消息。

然而,在某些场景下,发布方微服务需要立即获得响应才能继续后续处理。虽然 Kafka 本质上是为异步通信设计的,但可以通过配置支持同步请求-响应模式,使用独立主题实现。

本教程将探讨如何在 Spring Boot 应用中使用 Apache Kafka 实现同步请求-响应通信

2. 项目搭建

我们将模拟一个通知分发系统。创建一个同时作为生产者和消费者的 Spring Boot 应用。

2.1. 依赖项

pom.xml 中添加 Spring Kafka 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.3.4</version>
</dependency>

此依赖提供了连接和操作已配置的 Kafka 实例所需的核心类。

2.2. 定义请求-响应消息

定义两个记录类表示请求和响应消息

record NotificationDispatchRequest(String emailId, String content) {
}

public record NotificationDispatchResponse(UUID notificationId) {
}

NotificationDispatchRequest 包含通知的 emailIdcontentNotificationDispatchResponse 包含处理请求后生成的唯一 notificationId

2.3. 定义 Kafka 主题和配置属性

定义请求和响应的 Kafka 主题,并配置从消费者组件接收响应的超时时间

application.yaml 中配置属性,使用@ConfigurationProperties映射到 Java 记录类:

@Validated
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
record SynchronousKafkaProperties(
    @NotBlank
    String requestTopic,

    @NotBlank
    String replyTopic,

    @NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
    Duration replyTimeout
) {
}

添加验证注解确保必需属性正确配置。如果验证失败,Spring ApplicationContext 将启动失败。这符合快速失败原则

application.yaml 配置示例:

com:
  baeldung:
    kafka:
      synchronous:
        request-topic: notification-dispatch-request
        reply-topic: notification-dispatch-response
        reply-timeout: 30s

配置请求/响应主题名称和 30 秒的响应超时。

补充核心 Kafka 配置:

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: synchronous-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: com.baeldung.kafka.synchronous
    properties:
      allow:
        auto:
          create:
            topics: true

✅ 关键配置点:

  1. 使用环境变量配置 Kafka 服务器地址
  2. 为生产者/消费者配置序列化/反序列化器
  3. 为消费者配置 group-id 并信任请求-响应记录类的包
  4. 启用主题自动创建(仅演示用,生产环境禁用)

配置后,Spring Kafka 自动创建 ConsumerFactoryProducerFactory Bean,用于后续配置。

2.4. 定义 Kafka 配置 Bean

定义必要的 Kafka 配置 Bean:

@Bean
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
    ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
) {
    String replyTopic = synchronousKafkaProperties.replyTopic();
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}

注入 ConsumerFactory 创建 KafkaMessageListenerContainer Bean。此 Bean 负责创建轮询响应主题消息的容器

定义核心同步通信 Bean:

@Bean
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
    ProducerFactory<String, NotificationDispatchRequest> producerFactory,
    KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
) {
    Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
    var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
    return replyingKafkaTemplate;
}

使用 ProducerFactoryKafkaMessageListenerContainer 创建 ReplyingKafkaTemplate Bean,并配置响应超时时间。此 Bean 管理请求-响应主题交互,实现 Kafka 同步通信

最后定义监听器发送响应所需的 Bean:

@Bean
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
}

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
    ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
    KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
    factory.setConsumerFactory(consumerFactory);
    factory.setReplyTemplate(kafkaTemplate);
    return factory;
}
  1. 创建标准 KafkaTemplate Bean
  2. 定义 KafkaListenerContainerFactory Bean,使监听器消费请求主题后能向响应主题发送消息

3. 实现 Kafka 同步通信

3.1. 使用 ReplyingKafkaTemplate 发送接收消息

创建 NotificationDispatchService 使用 ReplyingKafkaTemplate 发送消息:

@Service
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
class NotificationDispatchService {

    private final SynchronousKafkaProperties synchronousKafkaProperties;
    private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;

    // 标准构造函数

    NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
        String requestTopic = synchronousKafkaProperties.requestTopic();
        ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);

        var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
        return requestReplyFuture.get().value();
    }
}

dispatch() 方法中:

  1. 获取配置的请求主题
  2. 创建 ProducerRecord
  3. 调用 sendAndReceive() 发布消息并等待响应
  4. 返回响应值

⚠️ 底层机制:

  • ReplyingKafkaTemplate 生成唯一关联 ID(随机 UUID)并添加到消息头
  • 添加包含响应主题名称的消息头
  • 使用关联 ID 作为键,将 RequestReplyFuture 存储在线程安全的ConcurrentHashMap
  • 支持多线程环境和并发请求

3.2. 定义 Kafka 消息监听器

创建监听请求主题并发送响应的组件:

@Component
class NotificationDispatchListener {

    @SendTo
    @KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
    NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
        // ... 处理逻辑
        UUID notificationId = UUID.randomUUID();
        return new NotificationDispatchResponse(notificationId);
    }
}

使用 @KafkaListener 监听请求主题,方法返回 NotificationDispatchResponse

关键注解 @SendTo

  • 从消息头提取关联 ID 和响应主题
  • 自动将返回值发送到响应主题并附加相同关联 ID
  • 使 NotificationDispatchService 中的 ReplyingKafkaTemplate 能通过原始关联 ID 获取正确的 RequestReplyFuture

4. 总结

本文探讨了在 Spring Boot 应用中使用 Apache Kafka 实现组件间同步通信的方法。

通过配置和模拟通知分发系统,展示了: ✅ 使用 ReplyingKafkaTemplate 将 Kafka 异步特性转换为同步请求-响应模式 ✅ 核心配置包括主题定义、超时设置和 Bean 配置 ✅ 监听器通过 @SendTo 注解实现自动响应

⚠️ 生产建议:

  • 此方案非常规,需谨慎评估是否符合项目架构
  • 避免在生产环境启用主题自动创建
  • 合理设置超时时间防止阻塞

所有代码示例可在 GitHub 获取。


原始标题:Synchronous Communication With Apache Kafka Using ReplyingKafkaTemplate | Baeldung