1. 概述
本文将详细介绍如何使用 AWS Java SDK 来操作 Amazon 的 SQS(Simple Queue Service,简单队列服务)。
对于熟悉消息队列的开发者来说,SQS 是一个高可用、可伸缩的托管消息服务。我们重点聚焦在实际开发中常用的 API 操作,避免理论堆砌,直接上干货 ✅。
2. 前置条件
使用 AWS SDK for Java 操作 SQS 所需的依赖、账户配置和客户端初始化方式,与操作 S3 类似(可参考相关 AWS 文章)。
假设你已经配置好 AWS 凭证(如 ~/.aws/credentials
),我们可以通过如下方式创建 SQS 客户端:
SqsClient sqsClient = SqsClient.builder()
.region(Region.US_EAST_1)
.credentialsProvider(ProfileCredentialsProvider.create())
.build();
⚠️ 注意:生产环境建议使用 IAM Role 或临时凭证,避免硬编码密钥。
3. 创建队列
创建队列是使用 SQS 的第一步。SDK 提供了简洁的 Builder 模式 API,非常直观。
3.1 创建标准队列(Standard Queue)
标准队列适用于大多数异步解耦场景,消息最多交付一次(at-least-once),但不保证顺序。
CreateQueueRequest createStandardQueueRequest = CreateQueueRequest.builder()
.queueName("my-standard-queue")
.build();
sqsClient.createQueue(createStandardQueueRequest);
✅ 简单粗暴,就一行代码搞定。
3.2 创建 FIFO 队列(FIFO Queue)
FIFO 队列适用于需要严格消息顺序和去重的场景,比如订单处理、状态同步等。
创建时需设置两个关键属性:
FifoQueue
: 必须设为true
ContentBasedDeduplication
: 启用内容自动去重
Map<QueueAttributeName, String> queueAttributes = new HashMap<>();
queueAttributes.put(QueueAttributeName.FIFO_QUEUE, "true");
queueAttributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true");
CreateQueueRequest createFifoQueueRequest = CreateQueueRequest.builder()
.queueName("my-fifo-queue.fifo") // ⚠️ FIFO 队列名必须以 .fifo 结尾
.attributes(queueAttributes)
.build();
sqsClient.createQueue(createFifoQueueRequest);
⚠️ 踩坑提醒:FIFO 队列名必须带 .fifo
后缀,否则会创建失败!
4. 发送消息到队列
消息发送是核心操作之一。SDK 提供了灵活的参数控制。
4.1 向标准队列发送消息
使用 SendMessageRequest
构建消息,支持消息体、延迟投递、自定义属性等。
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
MessageAttributeValue messageAttributeValue = MessageAttributeValue.builder()
.stringValue("This is an attribute")
.dataType("String")
.build();
messageAttributes.put("AttributeOne", messageAttributeValue);
SendMessageRequest sendMessageStandardQueue = SendMessageRequest.builder()
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
.messageBody("A simple message.")
.delaySeconds(30) // 延迟 30 秒后才可见
.messageAttributes(messageAttributes)
.build();
sqsClient.sendMessage(sendMessageStandardQueue);
delaySeconds(30)
:消息将在 30 秒后变为可消费状态,适合定时任务场景。
4.2 向 FIFO 队列发送消息
FIFO 队列必须指定 messageGroupId
,用于保证组内消息有序。
SendMessageRequest sendMessageFifoQueue = SendMessageRequest.builder()
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-fifo-queue.fifo")
.messageBody("FIFO Queue")
.messageGroupId("baeldung-group-1") // 必填:消息分组 ID
.messageAttributes(messageAttributes)
.build();
sqsClient.sendMessage(sendMessageFifoQueue);
✅ 同一个 messageGroupId
的消息会被顺序处理。
4.3 批量发送消息
批量发送可以显著提升吞吐量,减少请求次数。
List<SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
SendMessageBatchRequestEntry messageBatchRequestEntry1 = SendMessageBatchRequestEntry.builder()
.id("id-1")
.messageBody("batch-1")
.messageGroupId("baeldung-group-1")
.build();
SendMessageBatchRequestEntry messageBatchRequestEntry2 = SendMessageBatchRequestEntry.builder()
.id("id-2")
.messageBody("batch-2")
.messageGroupId("baeldung-group-1")
.build();
messageEntries.add(messageBatchRequestEntry1);
messageEntries.add(messageBatchRequestEntry2);
SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-fifo-queue.fifo")
.entries(messageEntries)
.build();
sqsClient.sendMessageBatch(sendMessageBatchRequest);
⚠️ 注意:
- 批量最多支持 10 条消息
- 每条消息必须有唯一
id
,用于失败时定位
5. 从队列读取消息
使用 receiveMessage
接口拉取消息,支持长轮询和批量获取。
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
.waitTimeSeconds(10) // 启用长轮询,最多等待 10 秒
.maxNumberOfMessages(10) // 最多取 10 条
.build();
List<Message> sqsMessages = sqsClient.receiveMessage(receiveMessageRequest)
.messages();
关键参数说明:
参数 | 说明 |
---|---|
maxNumberOfMessages |
最大 10 条,不能超过 |
waitTimeSeconds |
启用长轮询,减少空请求,推荐设置 5~20 秒 |
获取消息内容:
if (!sqsMessages.isEmpty()) {
Message message = sqsMessages.get(0);
System.out.println("Body: " + message.body());
System.out.println("Attributes: " + message.messageAttributes());
}
✅ 长轮询是必须开启的优化手段,否则会频繁触发空请求,增加成本。
6. 删除消息
消息被消费后必须显式删除,否则会在可见性超时后重新入队。
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
.receiptHandle(sqsMessages.get(0).receiptHandle()) // 必须使用服务返回的 receiptHandle
.build();
sqsClient.deleteMessage(deleteMessageRequest);
⚠️ 踩坑提醒:receiptHandle
是临时凭证,会过期,不能缓存复用!
7. 死信队列(Dead Letter Queue)
当消息反复消费失败时,应将其转移到死信队列(DLQ)以便后续排查。
步骤一:创建死信队列
CreateQueueRequest createDeadLetterQueueRequest = CreateQueueRequest.builder()
.queueName("my-dead-letter-queue")
.build();
String deadLetterQueueUrl = sqsClient.createQueue(createDeadLetterQueueRequest).queueUrl();
步骤二:获取死信队列的 ARN
GetQueueAttributesRequest getQueueAttributesRequest = GetQueueAttributesRequest.builder()
.queueUrl(deadLetterQueueUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build();
GetQueueAttributesResponse deadLetterQueueAttributes = sqsClient.getQueueAttributes(getQueueAttributesRequest);
String deadLetterQueueARN = deadLetterQueueAttributes.attributes().get(QueueAttributeName.QUEUE_ARN);
步骤三:绑定死信策略
Map<QueueAttributeName, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.REDRIVE_POLICY,
String.format("{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"%s\"}", deadLetterQueueARN));
SetQueueAttributesRequest queueAttributesRequest = SetQueueAttributesRequest.builder()
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
.attributes(attributes)
.build();
sqsClient.setQueueAttributes(queueAttributesRequest);
✅ maxReceiveCount
设置为 5,表示消息被接收 5 次仍未删除,则自动转入 DLQ。
⚠️ 要求:死信队列必须与源队列类型一致(标准 or FIFO)。
8. 监控队列状态
通过 SDK 可以快速获取队列的基本指标:
GetQueueAttributesRequest getQueueAttributesRequestForMonitoring = GetQueueAttributesRequest.builder()
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
.build();
GetQueueAttributesResponse attributesResponse = sqsClient.getQueueAttributes(getQueueAttributesRequestForMonitoring);
System.out.println("队列中待处理消息数: " + attributesResponse.attributes().get("ApproximateNumberOfMessages"));
System.out.println("正在被处理的消息数(In Flight): " + attributesResponse.attributes().get("ApproximateNumberOfMessagesNotVisible"));
这些指标可用于:
- 判断是否需要扩容消费者
- 检测消费积压
- 触发告警
更详细的监控建议接入 Amazon CloudWatch,设置告警规则。
9. 总结
本文通过代码示例,系统性地展示了如何使用 AWS Java SDK 管理 SQS 队列,涵盖:
- 队列创建(标准 / FIFO)
- 消息发送(单条 / 批量)
- 消息消费与删除
- 死信队列配置
- 队列监控
所有示例代码均已验证可用,完整源码可在 GitHub 获取:
👉 https://github.com/baeldung/tutorials/tree/master/aws-modules/aws-miscellaneous
掌握这些基础操作后,你可以轻松将 SQS 集成到微服务、事件驱动架构中,实现系统解耦与削峰填谷。