1. 概述
Spring Cloud Stream 是构建在 Spring Boot 和 Spring Integration 之上的框架,专门用于开发事件驱动或消息驱动的微服务。
本文将通过简单示例介绍 Spring Cloud Stream 的核心概念和构建方式,帮你快速上手这个强大的消息框架。
2. Maven 依赖
首先在 pom.xml
中添加消息中间件依赖(以 RabbitMQ 为例):
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.1.3</version>
</dependency>
再添加测试支持依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>3.1.3</version>
<scope>test</scope>
</dependency>
3. 核心概念
微服务架构遵循"智能接口、哑管道"原则。服务间通过 RabbitMQ 或 Kafka 等消息中间件通信,服务通过发布领域事件到接口/通道实现交互。
3.1. 核心组件
看一个简单的 Spring Cloud Stream 服务示例:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}
@EnableBinding
注解配置了 Processor
接口中定义的 INPUT
和 OUTPUT
通道。这两个通道都是绑定器,可配置连接具体消息中间件。
核心概念解析:
- 绑定(Bindings):声明式标识输入/输出通道的接口集合
- 绑定器(Binder):消息中间件实现(如 Kafka/RabbitMQ)
- 通道(Channel):应用与消息中间件间的通信管道
- 流监听器(StreamListeners):自动处理通道消息的方法,
MessageConverter
负责消息转换 - 消息模式(Message Schemas):消息序列化/反序列化方案,支持静态或动态加载
3.2. 通信模式
消息通过发布-订阅模式投递到目标。发布者按主题分类消息,订阅者订阅感兴趣的主题,中间件负责过滤分发。
订阅者可分组:消费者组(Consumer Group) 是由 group id 标识的订阅者集合,组内消息按负载均衡方式投递。
4. 编程模型
4.1. 功能测试
测试支持提供了与通道交互和消息检查的能力。测试前面的 enrichLogMessage
服务:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {
@Autowired
private Processor pipe;
@Autowired
private MessageCollector messageCollector;
@Test
public void whenSendMessage_thenResponseShouldUpdateText() {
pipe.input()
.send(MessageBuilder.withPayload(new LogMessage("This is my message"))
.build());
Object payload = messageCollector.forChannel(pipe.output())
.poll()
.getPayload();
assertEquals("[1]: This is my message", payload.toString());
}
}
4.2. 自定义通道
Spring Cloud 提供的 Processor
只有一个输入/输出通道。若需要一进两出,可自定义处理器:
public interface MyProcessor {
String INPUT = "myInput";
@Input
SubscribableChannel myInput();
@Output("myOutput")
MessageChannel anOutput();
@Output
MessageChannel anotherOutput();
}
Spring 会自动实现该接口。通道名可通过注解指定(如 @Output("myOutput")
),否则使用方法名。现在有三个通道:myInput
、myOutput
和 anotherOutput
。
按值路由消息(<10 走 anOutput
,≥10 走 anotherOutput
):
@Autowired
private MyProcessor processor;
@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
if (val < 10) {
processor.anOutput().send(message(val));
} else {
processor.anotherOutput().send(message(val));
}
}
private static final <T> Message<T> message(T val) {
return MessageBuilder.withPayload(val).build();
}
4.3. 条件分发
使用 @StreamListener
和 SpEL 表达式过滤消费者接收的消息。条件分发示例:
@Autowired
private MyProcessor processor;
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
processor.anOutput().send(message(val));
}
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
processor.anotherOutput().send(message(val));
}
⚠️ 限制:这些方法必须无返回值。
5. 环境搭建
5.1. 绑定器配置
通过 META-INF/spring.binders
配置默认绑定器:
rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration
或添加 RabbitMQ 绑定器依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
未提供绑定器时,Spring 使用通道间直接消息传递。
5.2. RabbitMQ 配置
在 src/main/resources/application.yml
中配置:
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
output:
destination: queue.pretty.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: 5672
username: <username>
password: <password>
virtual-host: /
input
绑定使用 queue.log.messages
交换器,output
使用 queue.pretty.log.messages
交换器,均通过 local_rabbit
绑定器。
✅ 无需预先创建 RabbitMQ 交换器/队列,应用启动时会自动创建。
测试时可通过 RabbitMQ 管理界面发布 JSON 格式消息。
5.3. 自定义消息转换
Spring Cloud Stream 支持按内容类型进行消息转换。将前面的 JSON 示例改为纯文本:
通过 MessageConverter
自定义 LogMessage
转换:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
//...
@Bean
public MessageConverter providesTextPlainMessageConverter() {
return new TextPlainMessageConverter();
}
//...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {
public TextPlainMessageConverter() {
super(new MimeType("text", "plain"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (LogMessage.class == clazz);
}
@Override
protected Object convertFromInternal(Message<?> message,
Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
String text = payload instanceof String
? (String) payload
: new String((byte[]) payload);
return new LogMessage(text);
}
}
修改后,在发布消息时设置 contentTypes
头为 text/plain
,载荷为 Hello World
即可正常工作。
5.4. 消费者组
运行多个应用实例时,输入通道的每条新消息会通知所有订阅者。通常需要消息只被处理一次,Spring Cloud Stream 通过消费者组实现此特性。
通过 spring.cloud.stream.bindings.<CHANNEL>.group
指定组名:
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
group: logMessageConsumers
...
6. 消息驱动微服务
6.1. 扩容
运行多个应用实例时,需确保数据在消费者间正确分配。Spring Cloud Stream 提供两个属性:
spring.cloud.stream.instanceCount
:运行的应用实例数spring.cloud.stream.instanceIndex
:当前应用的索引
例如部署两个 MyLoggerServiceApplication
实例:
- 两个实例的
instanceCount
都设为 2 instanceIndex
分别设为 0 和 1
使用 Spring Data Flow 部署时这些属性会自动设置。
6.2. 分区
领域事件可设为分区消息,有助于扩展存储并提升性能。领域事件通常包含分区键,确保相关消息落入同一分区。
假设按消息首字母分区(A-M 和 N-Z 两个分区):
spring.cloud.stream.bindings.output.producer.partitionKeyExpression
:分区表达式spring.cloud.stream.bindings.output.producer.partitionCount
:分区数量
当分区表达式过于复杂时,可通过 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass
实现自定义分区策略。
6.3. 健康检查
微服务环境中需要检测服务是否宕机或故障。通过 management.health.binders.enabled=true
启用绑定器健康检查。
访问 http://<host>:<port>/health
查看健康状态。
7. 总结
本文介绍了 Spring Cloud Stream 的核心概念,并通过 RabbitMQ 示例展示了基本用法。想深入了解可查阅官方文档。
本文源码可在 GitHub 获取。