1. 概述

Spring Cloud AWS 是一个简化与 AWS 服务交互的项目。Simple Queue Service (SQS) 是 AWS 提供的可扩展异步消息收发解决方案。

本文将重新介绍 Spring Cloud AWS SQS 集成,该模块在 Spring Cloud AWS 3.0 中已完全重写。框架提供了熟悉的 Spring 抽象层来处理 SQS 队列,例如 SqsTemplate@SqsListener 注解。

我们将通过事件驱动场景演示消息收发的示例,并展示如何使用 Testcontainers(管理临时 Docker 容器的工具)和 LocalStack(本地模拟 AWS 环境的工具)设置集成测试

2. 依赖项

Spring Cloud AWS BOM 确保项目间版本兼容,它声明了包括 Spring Boot 在内的多个依赖版本,应替代 Spring Boot 自身 BOM 使用

pom.xml 中导入方式:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>3.0.4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

核心依赖是 SQS Starter,包含所有 SQS 相关类。SQS 集成不依赖 Spring Boot,可在标准 Java 应用中独立使用:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

对于 Spring Boot 应用(如本教程),还需添加 Core Starter,以启用 SQS 自动配置和 AWS 凭证/区域配置:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

3. 搭建本地测试环境

本节介绍使用 LocalStack 和 Testcontainers 搭建本地测试环境。注意:本教程示例也可直接对接 AWS

3.1. 依赖项

运行 LocalStack 和 TestContainers 需要:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>

添加 awaitility 辅助异步消息消费断言:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

3.2. 配置

创建容器管理基类 BaseSqsIntegrationTest,供测试套件继承。每个继承类将启动新容器,确保测试套件间数据隔离

@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {
   // 测试配置将在此添加
}

声明 LocalStackContainer@Container 注解管理容器生命周期:

private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

@Container
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

使用 @DynamicPropertySource 绑定 Spring Cloud AWS 配置到 LocalStack:

@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
    registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
    registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
    registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
      .toString());
    // 其他 AWS 服务接口可在此添加
}

⚠️ 运行测试前确保 Docker 引擎已启动

4. 配置队列名称

通过 Spring Boot 的 application.yml 属性机制配置队列名称:

events:
  queues:
    user-created-by-name-queue: user_created_by_name_queue
    user-created-record-queue: user_created_record_queue
    user-created-event-type-queue: user_created_event_type_queue

创建属性 POJO:

@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {

    private String userCreatedByNameQueue;
    private String userCreatedRecordQueue;
    private String userCreatedEventTypeQueue;

    // getters and setters
}

在主类或配置类添加 @EnableConfigurationProperties

@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudAwsApplication.class, args);
    }
}

默认行为:Spring Cloud AWS SQS 会自动创建不存在的队列(便于开发环境搭建)。
生产环境:应用应无队列创建权限,队列不存在时启动失败。框架可配置为显式失败。

5. 发送和接收消息

Spring Cloud AWS 提供多种 SQS 消息收发方式,最常用的是 SqsTemplate 发送消息和 @SqsListener 接收消息

5.1. 场景设计

模拟事件驱动应用:响应 UserCreatedEvent 事件,将用户信息保存到本地仓库。

创建 User 实体:

public record User(String id, String name, String email) {
}

内存 UserRepository

@Repository
public class UserRepository {

    private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();

    public void save(User userToSave) {
        persistedUsers.put(userToSave.id(), userToSave);
    }

    public Optional<User> findById(String userId) {
        return Optional.ofNullable(persistedUsers.get(userId));
    }

    public Optional<User> findByName(String name) {
        return persistedUsers.values().stream()
          .filter(user -> user.name().equals(name))
          .findFirst();
    }
}

UserCreatedEvent 事件类:

public record UserCreatedEvent(String id, String username, String email) {
}

5.2. 测试准备

创建测试类 SpringCloudAwsSQSLiveTest 继承 BaseSqsIntegrationTest,注入依赖:

public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {

    private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private EventQueuesProperties eventQueuesProperties;

   // ...
}

创建监听器组件 UserEventListeners

@Component
public class UserEventListeners {

    private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);

    public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";

    private final UserRepository userRepository;

    public UserEventListeners(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    // 监听器将在此添加 
}

5.3. 字符串消息

发送字符串消息并验证持久化:

@Test
void givenAStringPayload_whenSend_shouldReceive() {
    // given
    var userName = "Albert";

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
      .payload(userName));
    logger.info("Message sent with payload {}", userName);

    // then
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findByName(userName)
        .isPresent());
}

日志输出示例:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert

❌ 测试失败(缺少监听器),添加监听器:

@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
    logger.info("Received message: {}", username);
    userRepository.save(new User(UUID.randomUUID()
      .toString(), username, null));
}

✅ 测试通过,监听日志:

INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert

5.4. POJO 和 Record 消息

发送 UserCreatedEvent 记录:

@Test
void givenARecordPayload_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "john.doe@example.com");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
      .payload(payload));

    // then
    logger.info("Message sent with payload: {}", payload);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

发送日志:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, john.doe@example.com]

添加监听器:

@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
    logger.info("Received message: {}", event);
    userRepository.save(new User(event.id(), event.username(), event.email()));
}

✅ 监听日志:

INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, john.doe@example.com]

框架自动使用 Spring Context 中的 ObjectMapper 处理序列化/反序列化,可自定义配置(本文不展开)。

5.5. Spring 消息与消息头

发送带自定义头的 Record,接收 Spring Message 实例:

@Test
void givenCustomHeaders_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "john.doe@example.com");
    var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
      .payload(payload)
      .headers(headers));

    // then
    logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

发送日志:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest  : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, john.doe@example.com] and custom headers: {eventType=UserCreatedEvent}

添加监听器:

@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
    @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
    logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
    UserCreatedEvent payload = message.getPayload();
    userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}

✅ 监听日志:

INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, john.doe@example.com], headers=...

框架自动将 SQS 消息属性转换为消息头,建议使用 SqsHeader 常量获取标准 SQS 头。

6. 总结

本文通过事件驱动场景,演示了 Spring Cloud AWS SQS 3.0 的多种消息收发方式。我们使用 LocalStack 和 Testcontainers 搭建了本地测试环境,并配置了框架以适配集成测试。

完整代码示例可在 GitHub 获取。


原始标题:Introduction to Spring Cloud AWS 3.0 – SQS Integration | Baeldung