1. 概述
Apache Kafka 已成为构建事件驱动架构最流行的消息系统之一,在这种架构中,一个微服务将消息发布到主题,另一个微服务异步消费并处理这些消息。
然而,在某些场景下,发布方微服务需要立即获得响应才能继续后续处理。虽然 Kafka 本质上是为异步通信设计的,但可以通过配置支持同步请求-响应模式,使用独立主题实现。
本教程将探讨如何在 Spring Boot 应用中使用 Apache Kafka 实现同步请求-响应通信。
2. 项目搭建
我们将模拟一个通知分发系统。创建一个同时作为生产者和消费者的 Spring Boot 应用。
2.1. 依赖项
在 pom.xml 中添加 Spring Kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.4</version>
</dependency>
此依赖提供了连接和操作已配置的 Kafka 实例所需的核心类。
2.2. 定义请求-响应消息
定义两个记录类表示请求和响应消息:
record NotificationDispatchRequest(String emailId, String content) {
}
public record NotificationDispatchResponse(UUID notificationId) {
}
NotificationDispatchRequest 包含通知的 emailId 和 content,NotificationDispatchResponse 包含处理请求后生成的唯一 notificationId。
2.3. 定义 Kafka 主题和配置属性
定义请求和响应的 Kafka 主题,并配置从消费者组件接收响应的超时时间。
在 application.yaml 中配置属性,使用@ConfigurationProperties映射到 Java 记录类:
@Validated
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
record SynchronousKafkaProperties(
@NotBlank
String requestTopic,
@NotBlank
String replyTopic,
@NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
Duration replyTimeout
) {
}
添加验证注解确保必需属性正确配置。如果验证失败,Spring ApplicationContext 将启动失败。这符合快速失败原则。
application.yaml 配置示例:
com:
baeldung:
kafka:
synchronous:
request-topic: notification-dispatch-request
reply-topic: notification-dispatch-response
reply-timeout: 30s
配置请求/响应主题名称和 30 秒的响应超时。
补充核心 Kafka 配置:
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: synchronous-kafka-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.baeldung.kafka.synchronous
properties:
allow:
auto:
create:
topics: true
✅ 关键配置点:
- 使用环境变量配置 Kafka 服务器地址
- 为生产者/消费者配置序列化/反序列化器
- 为消费者配置 group-id 并信任请求-响应记录类的包
- 启用主题自动创建(仅演示用,生产环境禁用)
配置后,Spring Kafka 自动创建 ConsumerFactory 和 ProducerFactory Bean,用于后续配置。
2.4. 定义 Kafka 配置 Bean
定义必要的 Kafka 配置 Bean:
@Bean
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
) {
String replyTopic = synchronousKafkaProperties.replyTopic();
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
注入 ConsumerFactory 创建 KafkaMessageListenerContainer Bean。此 Bean 负责创建轮询响应主题消息的容器。
定义核心同步通信 Bean:
@Bean
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
ProducerFactory<String, NotificationDispatchRequest> producerFactory,
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
) {
Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
return replyingKafkaTemplate;
}
使用 ProducerFactory 和 KafkaMessageListenerContainer 创建 ReplyingKafkaTemplate Bean,并配置响应超时时间。此 Bean 管理请求-响应主题交互,实现 Kafka 同步通信。
最后定义监听器发送响应所需的 Bean:
@Bean
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
factory.setConsumerFactory(consumerFactory);
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
- 创建标准 KafkaTemplate Bean
- 定义 KafkaListenerContainerFactory Bean,使监听器消费请求主题后能向响应主题发送消息
3. 实现 Kafka 同步通信
3.1. 使用 ReplyingKafkaTemplate 发送接收消息
创建 NotificationDispatchService 使用 ReplyingKafkaTemplate 发送消息:
@Service
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
class NotificationDispatchService {
private final SynchronousKafkaProperties synchronousKafkaProperties;
private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;
// 标准构造函数
NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
String requestTopic = synchronousKafkaProperties.requestTopic();
ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);
var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
return requestReplyFuture.get().value();
}
}
在 dispatch() 方法中:
- 获取配置的请求主题
- 创建 ProducerRecord
- 调用 sendAndReceive() 发布消息并等待响应
- 返回响应值
⚠️ 底层机制:
- ReplyingKafkaTemplate 生成唯一关联 ID(随机 UUID)并添加到消息头
- 添加包含响应主题名称的消息头
- 使用关联 ID 作为键,将 RequestReplyFuture 存储在线程安全的ConcurrentHashMap中
- 支持多线程环境和并发请求
3.2. 定义 Kafka 消息监听器
创建监听请求主题并发送响应的组件:
@Component
class NotificationDispatchListener {
@SendTo
@KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
// ... 处理逻辑
UUID notificationId = UUID.randomUUID();
return new NotificationDispatchResponse(notificationId);
}
}
使用 @KafkaListener 监听请求主题,方法返回 NotificationDispatchResponse。
关键注解 @SendTo:
- 从消息头提取关联 ID 和响应主题
- 自动将返回值发送到响应主题并附加相同关联 ID
- 使 NotificationDispatchService 中的 ReplyingKafkaTemplate 能通过原始关联 ID 获取正确的 RequestReplyFuture
4. 总结
本文探讨了在 Spring Boot 应用中使用 Apache Kafka 实现组件间同步通信的方法。
通过配置和模拟通知分发系统,展示了: ✅ 使用 ReplyingKafkaTemplate 将 Kafka 异步特性转换为同步请求-响应模式 ✅ 核心配置包括主题定义、超时设置和 Bean 配置 ✅ 监听器通过 @SendTo 注解实现自动响应
⚠️ 生产建议:
- 此方案非常规,需谨慎评估是否符合项目架构
- 避免在生产环境启用主题自动创建
- 合理设置超时时间防止阻塞
所有代码示例可在 GitHub 获取。