1. 概述

本文将详细介绍如何使用 AWS Java SDK 来操作 Amazon 的 SQS(Simple Queue Service,简单队列服务)。

对于熟悉消息队列的开发者来说,SQS 是一个高可用、可伸缩的托管消息服务。我们重点聚焦在实际开发中常用的 API 操作,避免理论堆砌,直接上干货 ✅。


2. 前置条件

使用 AWS SDK for Java 操作 SQS 所需的依赖、账户配置和客户端初始化方式,与操作 S3 类似(可参考相关 AWS 文章)。

假设你已经配置好 AWS 凭证(如 ~/.aws/credentials),我们可以通过如下方式创建 SQS 客户端:

SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(ProfileCredentialsProvider.create())
    .build();

⚠️ 注意:生产环境建议使用 IAM Role 或临时凭证,避免硬编码密钥。


3. 创建队列

创建队列是使用 SQS 的第一步。SDK 提供了简洁的 Builder 模式 API,非常直观。

3.1 创建标准队列(Standard Queue)

标准队列适用于大多数异步解耦场景,消息最多交付一次(at-least-once),但不保证顺序。

CreateQueueRequest createStandardQueueRequest = CreateQueueRequest.builder()
    .queueName("my-standard-queue")
    .build();

sqsClient.createQueue(createStandardQueueRequest);

✅ 简单粗暴,就一行代码搞定。

3.2 创建 FIFO 队列(FIFO Queue)

FIFO 队列适用于需要严格消息顺序和去重的场景,比如订单处理、状态同步等。

创建时需设置两个关键属性:

  • FifoQueue: 必须设为 true
  • ContentBasedDeduplication: 启用内容自动去重
Map<QueueAttributeName, String> queueAttributes = new HashMap<>();
queueAttributes.put(QueueAttributeName.FIFO_QUEUE, "true");
queueAttributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true");

CreateQueueRequest createFifoQueueRequest = CreateQueueRequest.builder()
    .queueName("my-fifo-queue.fifo") // ⚠️ FIFO 队列名必须以 .fifo 结尾
    .attributes(queueAttributes)
    .build();

sqsClient.createQueue(createFifoQueueRequest);

⚠️ 踩坑提醒:FIFO 队列名必须带 .fifo 后缀,否则会创建失败!


4. 发送消息到队列

消息发送是核心操作之一。SDK 提供了灵活的参数控制。

4.1 向标准队列发送消息

使用 SendMessageRequest 构建消息,支持消息体、延迟投递、自定义属性等。

Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
MessageAttributeValue messageAttributeValue = MessageAttributeValue.builder()
    .stringValue("This is an attribute")
    .dataType("String")
    .build();

messageAttributes.put("AttributeOne", messageAttributeValue);

SendMessageRequest sendMessageStandardQueue = SendMessageRequest.builder()
    .queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
    .messageBody("A simple message.")
    .delaySeconds(30) // 延迟 30 秒后才可见
    .messageAttributes(messageAttributes)
    .build();

sqsClient.sendMessage(sendMessageStandardQueue);
  • delaySeconds(30):消息将在 30 秒后变为可消费状态,适合定时任务场景。

4.2 向 FIFO 队列发送消息

FIFO 队列必须指定 messageGroupId,用于保证组内消息有序。

SendMessageRequest sendMessageFifoQueue = SendMessageRequest.builder()
    .queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-fifo-queue.fifo")
    .messageBody("FIFO Queue")
    .messageGroupId("baeldung-group-1") // 必填:消息分组 ID
    .messageAttributes(messageAttributes)
    .build();

sqsClient.sendMessage(sendMessageFifoQueue);

✅ 同一个 messageGroupId 的消息会被顺序处理。

4.3 批量发送消息

批量发送可以显著提升吞吐量,减少请求次数。

List<SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
SendMessageBatchRequestEntry messageBatchRequestEntry1 = SendMessageBatchRequestEntry.builder()
    .id("id-1")
    .messageBody("batch-1")
    .messageGroupId("baeldung-group-1")
    .build();

SendMessageBatchRequestEntry messageBatchRequestEntry2 = SendMessageBatchRequestEntry.builder()
    .id("id-2")
    .messageBody("batch-2")
    .messageGroupId("baeldung-group-1")
    .build();

messageEntries.add(messageBatchRequestEntry1);
messageEntries.add(messageBatchRequestEntry2);

SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
    .queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-fifo-queue.fifo")
    .entries(messageEntries)
    .build();

sqsClient.sendMessageBatch(sendMessageBatchRequest);

⚠️ 注意:

  • 批量最多支持 10 条消息
  • 每条消息必须有唯一 id,用于失败时定位

5. 从队列读取消息

使用 receiveMessage 接口拉取消息,支持长轮询和批量获取。

ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
    .queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
    .waitTimeSeconds(10)           // 启用长轮询,最多等待 10 秒
    .maxNumberOfMessages(10)       // 最多取 10 条
    .build();

List<Message> sqsMessages = sqsClient.receiveMessage(receiveMessageRequest)
    .messages();

关键参数说明:

参数 说明
maxNumberOfMessages 最大 10 条,不能超过
waitTimeSeconds 启用长轮询,减少空请求,推荐设置 5~20 秒

获取消息内容:

if (!sqsMessages.isEmpty()) {
    Message message = sqsMessages.get(0);
    System.out.println("Body: " + message.body());
    System.out.println("Attributes: " + message.messageAttributes());
}

✅ 长轮询是必须开启的优化手段,否则会频繁触发空请求,增加成本。


6. 删除消息

消息被消费后必须显式删除,否则会在可见性超时后重新入队。

DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
    .queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
    .receiptHandle(sqsMessages.get(0).receiptHandle()) // 必须使用服务返回的 receiptHandle
    .build();

sqsClient.deleteMessage(deleteMessageRequest);

⚠️ 踩坑提醒:receiptHandle 是临时凭证,会过期,不能缓存复用!


7. 死信队列(Dead Letter Queue)

当消息反复消费失败时,应将其转移到死信队列(DLQ)以便后续排查。

步骤一:创建死信队列

CreateQueueRequest createDeadLetterQueueRequest = CreateQueueRequest.builder()
    .queueName("my-dead-letter-queue")
    .build();

String deadLetterQueueUrl = sqsClient.createQueue(createDeadLetterQueueRequest).queueUrl();

步骤二:获取死信队列的 ARN

GetQueueAttributesRequest getQueueAttributesRequest = GetQueueAttributesRequest.builder()
    .queueUrl(deadLetterQueueUrl)
    .attributeNames(QueueAttributeName.QUEUE_ARN)
    .build();

GetQueueAttributesResponse deadLetterQueueAttributes = sqsClient.getQueueAttributes(getQueueAttributesRequest);
String deadLetterQueueARN = deadLetterQueueAttributes.attributes().get(QueueAttributeName.QUEUE_ARN);

步骤三:绑定死信策略

Map<QueueAttributeName, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.REDRIVE_POLICY, 
    String.format("{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"%s\"}", deadLetterQueueARN));

SetQueueAttributesRequest queueAttributesRequest = SetQueueAttributesRequest.builder()
    .queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
    .attributes(attributes)
    .build();

sqsClient.setQueueAttributes(queueAttributesRequest);

maxReceiveCount 设置为 5,表示消息被接收 5 次仍未删除,则自动转入 DLQ。

⚠️ 要求:死信队列必须与源队列类型一致(标准 or FIFO)。


8. 监控队列状态

通过 SDK 可以快速获取队列的基本指标:

GetQueueAttributesRequest getQueueAttributesRequestForMonitoring = GetQueueAttributesRequest.builder()
    .queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/my-standard-queue")
    .build();

GetQueueAttributesResponse attributesResponse = sqsClient.getQueueAttributes(getQueueAttributesRequestForMonitoring);

System.out.println("队列中待处理消息数: " + attributesResponse.attributes().get("ApproximateNumberOfMessages"));
System.out.println("正在被处理的消息数(In Flight): " + attributesResponse.attributes().get("ApproximateNumberOfMessagesNotVisible"));

这些指标可用于:

  • 判断是否需要扩容消费者
  • 检测消费积压
  • 触发告警

更详细的监控建议接入 Amazon CloudWatch,设置告警规则。


9. 总结

本文通过代码示例,系统性地展示了如何使用 AWS Java SDK 管理 SQS 队列,涵盖:

  • 队列创建(标准 / FIFO)
  • 消息发送(单条 / 批量)
  • 消息消费与删除
  • 死信队列配置
  • 队列监控

所有示例代码均已验证可用,完整源码可在 GitHub 获取:

👉 https://github.com/baeldung/tutorials/tree/master/aws-modules/aws-miscellaneous

掌握这些基础操作后,你可以轻松将 SQS 集成到微服务、事件驱动架构中,实现系统解耦与削峰填谷。


原始标题:Managing Amazon SQS Queues in Java