1. 概述
Spring Cloud AWS 是一个简化与 AWS 服务交互的项目。Simple Queue Service (SQS) 是 AWS 提供的可扩展异步消息收发解决方案。
本文将重新介绍 Spring Cloud AWS SQS 集成,该模块在 Spring Cloud AWS 3.0 中已完全重写。框架提供了熟悉的 Spring 抽象层来处理 SQS 队列,例如 SqsTemplate
和 @SqsListener
注解。
我们将通过事件驱动场景演示消息收发的示例,并展示如何使用 Testcontainers(管理临时 Docker 容器的工具)和 LocalStack(本地模拟 AWS 环境的工具)设置集成测试。
2. 依赖项
Spring Cloud AWS BOM 确保项目间版本兼容,它声明了包括 Spring Boot 在内的多个依赖版本,应替代 Spring Boot 自身 BOM 使用。
在 pom.xml
中导入方式:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
核心依赖是 SQS Starter,包含所有 SQS 相关类。SQS 集成不依赖 Spring Boot,可在标准 Java 应用中独立使用:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
对于 Spring Boot 应用(如本教程),还需添加 Core Starter,以启用 SQS 自动配置和 AWS 凭证/区域配置:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
3. 搭建本地测试环境
本节介绍使用 LocalStack 和 Testcontainers 搭建本地测试环境。注意:本教程示例也可直接对接 AWS。
3.1. 依赖项
运行 LocalStack 和 TestContainers 需要:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
添加 awaitility 辅助异步消息消费断言:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3.2. 配置
创建容器管理基类 BaseSqsIntegrationTest
,供测试套件继承。每个继承类将启动新容器,确保测试套件间数据隔离。
@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {
// 测试配置将在此添加
}
声明 LocalStackContainer
,@Container
注解管理容器生命周期:
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
使用 @DynamicPropertySource
绑定 Spring Cloud AWS 配置到 LocalStack:
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
// 其他 AWS 服务接口可在此添加
}
⚠️ 运行测试前确保 Docker 引擎已启动。
4. 配置队列名称
通过 Spring Boot 的 application.yml
属性机制配置队列名称:
events:
queues:
user-created-by-name-queue: user_created_by_name_queue
user-created-record-queue: user_created_record_queue
user-created-event-type-queue: user_created_event_type_queue
创建属性 POJO:
@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {
private String userCreatedByNameQueue;
private String userCreatedRecordQueue;
private String userCreatedEventTypeQueue;
// getters and setters
}
在主类或配置类添加 @EnableConfigurationProperties
:
@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudAwsApplication.class, args);
}
}
✅ 默认行为:Spring Cloud AWS SQS 会自动创建不存在的队列(便于开发环境搭建)。
❌ 生产环境:应用应无队列创建权限,队列不存在时启动失败。框架可配置为显式失败。
5. 发送和接收消息
Spring Cloud AWS 提供多种 SQS 消息收发方式,最常用的是 SqsTemplate
发送消息和 @SqsListener
接收消息。
5.1. 场景设计
模拟事件驱动应用:响应 UserCreatedEvent
事件,将用户信息保存到本地仓库。
创建 User
实体:
public record User(String id, String name, String email) {
}
内存 UserRepository
:
@Repository
public class UserRepository {
private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();
public void save(User userToSave) {
persistedUsers.put(userToSave.id(), userToSave);
}
public Optional<User> findById(String userId) {
return Optional.ofNullable(persistedUsers.get(userId));
}
public Optional<User> findByName(String name) {
return persistedUsers.values().stream()
.filter(user -> user.name().equals(name))
.findFirst();
}
}
UserCreatedEvent
事件类:
public record UserCreatedEvent(String id, String username, String email) {
}
5.2. 测试准备
创建测试类 SpringCloudAwsSQSLiveTest
继承 BaseSqsIntegrationTest
,注入依赖:
public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private EventQueuesProperties eventQueuesProperties;
// ...
}
创建监听器组件 UserEventListeners
:
@Component
public class UserEventListeners {
private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);
public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";
private final UserRepository userRepository;
public UserEventListeners(UserRepository userRepository) {
this.userRepository = userRepository;
}
// 监听器将在此添加
}
5.3. 字符串消息
发送字符串消息并验证持久化:
@Test
void givenAStringPayload_whenSend_shouldReceive() {
// given
var userName = "Albert";
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
.payload(userName));
logger.info("Message sent with payload {}", userName);
// then
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findByName(userName)
.isPresent());
}
日志输出示例:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert
❌ 测试失败(缺少监听器),添加监听器:
@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
logger.info("Received message: {}", username);
userRepository.save(new User(UUID.randomUUID()
.toString(), username, null));
}
✅ 测试通过,监听日志:
INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert
5.4. POJO 和 Record 消息
发送 UserCreatedEvent
记录:
@Test
void givenARecordPayload_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "john.doe@example.com");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
.payload(payload));
// then
logger.info("Message sent with payload: {}", payload);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
发送日志:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, john.doe@example.com]
添加监听器:
@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
logger.info("Received message: {}", event);
userRepository.save(new User(event.id(), event.username(), event.email()));
}
✅ 监听日志:
INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, john.doe@example.com]
框架自动使用 Spring Context 中的 ObjectMapper
处理序列化/反序列化,可自定义配置(本文不展开)。
5.5. Spring 消息与消息头
发送带自定义头的 Record,接收 Spring Message
实例:
@Test
void givenCustomHeaders_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "john.doe@example.com");
var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
.payload(payload)
.headers(headers));
// then
logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
发送日志:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, john.doe@example.com] and custom headers: {eventType=UserCreatedEvent}
添加监听器:
@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
@Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
UserCreatedEvent payload = message.getPayload();
userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}
✅ 监听日志:
INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, john.doe@example.com], headers=...
框架自动将 SQS 消息属性转换为消息头,建议使用 SqsHeader
常量获取标准 SQS 头。
6. 总结
本文通过事件驱动场景,演示了 Spring Cloud AWS SQS 3.0 的多种消息收发方式。我们使用 LocalStack 和 Testcontainers 搭建了本地测试环境,并配置了框架以适配集成测试。
完整代码示例可在 GitHub 获取。