1. 概述

Spring Cloud Stream 是构建在 Spring Boot 和 Spring Integration 之上的框架,专门用于开发事件驱动或消息驱动的微服务

本文将通过简单示例介绍 Spring Cloud Stream 的核心概念和构建方式,帮你快速上手这个强大的消息框架。

2. Maven 依赖

首先在 pom.xml 中添加消息中间件依赖(以 RabbitMQ 为例):

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>3.1.3</version>
</dependency>

再添加测试支持依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <version>3.1.3</version>
    <scope>test</scope>
</dependency>

3. 核心概念

微服务架构遵循"智能接口、哑管道"原则。服务间通过 RabbitMQ 或 Kafka 等消息中间件通信,服务通过发布领域事件到接口/通道实现交互

3.1. 核心组件

看一个简单的 Spring Cloud Stream 服务示例:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

@EnableBinding 注解配置了 Processor 接口中定义的 INPUTOUTPUT 通道。这两个通道都是绑定器,可配置连接具体消息中间件

核心概念解析:

  • 绑定(Bindings):声明式标识输入/输出通道的接口集合
  • 绑定器(Binder):消息中间件实现(如 Kafka/RabbitMQ)
  • 通道(Channel):应用与消息中间件间的通信管道
  • 流监听器(StreamListeners):自动处理通道消息的方法,MessageConverter 负责消息转换
  • 消息模式(Message Schemas):消息序列化/反序列化方案,支持静态或动态加载

3.2. 通信模式

消息通过发布-订阅模式投递到目标。发布者按主题分类消息,订阅者订阅感兴趣的主题,中间件负责过滤分发。

订阅者可分组:消费者组(Consumer Group) 是由 group id 标识的订阅者集合,组内消息按负载均衡方式投递。

4. 编程模型

4.1. 功能测试

测试支持提供了与通道交互和消息检查的能力。测试前面的 enrichLogMessage 服务:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {

    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
          .build());

        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}

4.2. 自定义通道

Spring Cloud 提供的 Processor 只有一个输入/输出通道。若需要一进两出,可自定义处理器:

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

Spring 会自动实现该接口。通道名可通过注解指定(如 @Output("myOutput")),否则使用方法名。现在有三个通道:myInputmyOutputanotherOutput

按值路由消息(<10 走 anOutput,≥10 走 anotherOutput):

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
}

4.3. 条件分发

使用 @StreamListener 和 SpEL 表达式过滤消费者接收的消息。条件分发示例:

@Autowired
private MyProcessor processor;

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

⚠️ 限制:这些方法必须无返回值。

5. 环境搭建

5.1. 绑定器配置

通过 META-INF/spring.binders 配置默认绑定器:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

或添加 RabbitMQ 绑定器依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

未提供绑定器时,Spring 使用通道间直接消息传递

5.2. RabbitMQ 配置

src/main/resources/application.yml 中配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: 5672
                username: <username>
                password: <password>
                virtual-host: /

input 绑定使用 queue.log.messages 交换器,output 使用 queue.pretty.log.messages 交换器,均通过 local_rabbit 绑定器。

无需预先创建 RabbitMQ 交换器/队列,应用启动时会自动创建。

测试时可通过 RabbitMQ 管理界面发布 JSON 格式消息。

5.3. 自定义消息转换

Spring Cloud Stream 支持按内容类型进行消息转换。将前面的 JSON 示例改为纯文本:

通过 MessageConverter 自定义 LogMessage 转换

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    //...

    @Bean
    public MessageConverter providesTextPlainMessageConverter() {
        return new TextPlainMessageConverter();
    }

    //...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {

    public TextPlainMessageConverter() {
        super(new MimeType("text", "plain"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (LogMessage.class == clazz);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, 
        Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String 
          ? (String) payload 
          : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

修改后,在发布消息时设置 contentTypes 头为 text/plain,载荷为 Hello World 即可正常工作。

5.4. 消费者组

运行多个应用实例时,输入通道的每条新消息会通知所有订阅者。通常需要消息只被处理一次,Spring Cloud Stream 通过消费者组实现此特性。

通过 spring.cloud.stream.bindings.<CHANNEL>.group 指定组名:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
          group: logMessageConsumers
          ...

6. 消息驱动微服务

6.1. 扩容

运行多个应用实例时,需确保数据在消费者间正确分配。Spring Cloud Stream 提供两个属性:

  • spring.cloud.stream.instanceCount:运行的应用实例数
  • spring.cloud.stream.instanceIndex:当前应用的索引

例如部署两个 MyLoggerServiceApplication 实例:

  • 两个实例的 instanceCount 都设为 2
  • instanceIndex 分别设为 0 和 1

使用 Spring Data Flow 部署时这些属性会自动设置。

6.2. 分区

领域事件可设为分区消息,有助于扩展存储并提升性能。领域事件通常包含分区键,确保相关消息落入同一分区。

假设按消息首字母分区(A-M 和 N-Z 两个分区):

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression:分区表达式
  • spring.cloud.stream.bindings.output.producer.partitionCount:分区数量

当分区表达式过于复杂时,可通过 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 实现自定义分区策略。

6.3. 健康检查

微服务环境中需要检测服务是否宕机或故障。通过 management.health.binders.enabled=true 启用绑定器健康检查。

访问 http://<host>:<port>/health 查看健康状态。

7. 总结

本文介绍了 Spring Cloud Stream 的核心概念,并通过 RabbitMQ 示例展示了基本用法。想深入了解可查阅官方文档

本文源码可在 GitHub 获取。