1. 引言
在 Spring AMQP 的默认行为中,当一条消息消费失败时,它会被重新放回队列,等待下一次消费。这看似合理,但如果处理不当,很容易陷入无限重试的死循环,不仅浪费系统资源,还可能导致服务雪崩。
虽然使用 死信队列(Dead Letter Queue) 是处理失败消息的标准做法,但在很多场景下我们更希望先尝试几次自动重试,让系统有机会自我恢复,而不是直接丢进死信队列。
本文将介绍两种实现 指数退避(Exponential Backoff) 重试策略的方式:
✅ 一种是基于 Spring Retry 的同步阻塞式重试
✅ 另一种是利用 TTL + 死信队列的异步非阻塞式重试
这两种方式各有优劣,适用于不同场景,我们一一来看。
2. 前置准备
本文使用 RabbitMQ 作为 AMQP 的实现。如果你还没搭好环境,可以用下面这条 Docker 命令快速启动一个带管理界面的实例:
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq-server rabbitmq:3-management
访问 http://localhost:15672
(默认账号密码均为 guest
)即可查看管理后台。
项目依赖只需引入 spring-boot-starter-amqp
:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
</dependencies>
后续示例基于 Spring Boot 构建,配置类和测试类均省略部分样板代码,只保留核心逻辑。
3. 阻塞式重试(Blocking Retry)
这种方式利用 Spring Retry 模块,在消费者内部进行重试。每次失败后按指数增长的间隔等待,直到达到最大重试次数。
3.1 队列定义
先定义一个简单的临时队列:
@Bean
public Queue blockingQueue() {
return QueueBuilder.nonDurable("blocking-queue").build();
}
3.2 配置指数退避策略
通过 RetryInterceptorBuilder
设置重试参数:
- 初始间隔:1000ms
- 指数因子:3.0
- 最大等待时间:10000ms
- 最大尝试次数:5 次
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000) // initial, multiplier, max
.maxAttempts(5)
.recoverer(observableRecoverer()) // 达到最大重试次数后的兜底处理
.build();
}
然后将该拦截器注入到自定义的 Listener 容器工厂中:
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(new Advice[]{retryInterceptor});
return factory;
}
3.3 消费者实现
消费者故意抛出异常模拟失败:
@RabbitListener(
queues = "blocking-queue",
containerFactory = "retryContainerFactory"
)
public void consumeBlocking(String payload) throws Exception {
logger.info("Processing message from blocking-queue: {}", payload);
throw new Exception("exception occured!");
}
3.4 测试验证
发送两条消息并观察日志:
@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
observableRecoverer.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
}
latch.await();
}
日志输出如下:
2020-02-18 21:17:55.638 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875 ERROR : java.lang.Exception: exception occured!
...
可以看到重试间隔为:1s → 3s → 9s → 超过 10s 后按上限 10s 执行,符合指数增长规律。
3.5 踩坑提醒 ⚠️
这种方式虽然简单粗暴,但有一个致命问题:重试期间消费者线程被完全阻塞。
即使你设置了 concurrency = "2"
,每个重试中的消息仍会独占一个消费者线程。在高并发场景下极易造成消费者资源耗尽,导致其他正常消息延迟处理。
✅ 改进建议:仅适用于低频、非关键业务场景,或作为兜底方案。
4. 非阻塞式重试(Non-blocking Retry)
为了解决阻塞问题,我们可以换一种思路:利用消息的 TTL(Time-To-Live)和死信机制来实现异步重试。
核心思想是:
- 每次失败不立即重试,而是把消息发到一个带有 TTL 的“重试队列”
- 当消息过期后,自动进入死信队列
- 死信队列的消费者再把消息扔回原始队列,形成一次“重试”
这样整个过程是异步的,不会阻塞原始消费者。
4.1 死信队列定义
先定义一个统一的“重试结束”死信队列:
@Bean
public Queue retryWaitEndedQueue() {
return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}
4.2 死信消费者:负责“重投”
这个消费者只有一个职责:把过期的消息重新发回原始队列。
@RabbitListener(
queues = "retry-wait-ended-queue",
containerFactory = "defaultContainerFactory"
)
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception {
MessageProperties props = message.getMessageProperties();
// 从 header 中取出原始 exchange 和 routing key
String originalExchange = props.getHeader("x-original-exchange");
String originalRoutingKey = props.getHeader("x-original-routing-key");
rabbitTemplate.convertAndSend(originalExchange, originalRoutingKey, message);
}
4.3 定义重试队列
创建多个重试队列(这里以 3 次为例),每个队列没有持久化,且死信指向上面的 retry-wait-ended-queue
:
@Bean
public Queue retryQueue1() {
return QueueBuilder.nonDurable("retry-queue-1")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue2() {
return QueueBuilder.nonDurable("retry-queue-2")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue3() {
return QueueBuilder.nonDurable("retry-queue-3")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
4.4 封装指数退避逻辑
用一个 RetryQueues
类管理重试配置:
public class RetryQueues {
private Queue[] queues;
private long initialInterval;
private double factor;
private long maxWait;
// 构造函数、getter/setter 略
}
注册 Bean 时传入参数:
@Bean
public RetryQueues retryQueues() {
return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}
4.5 自定义拦截器实现重试跳转
关键逻辑在 RetryQueuesInterceptor
中:
public class RetryQueuesInterceptor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
try {
int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
sendToNextRetryQueue(messageAndChannel, retryCount);
} catch (Throwable t) {
throw new RuntimeException(t);
}
});
}
}
失败时调用 sendToNextRetryQueue
,将消息发往下一个重试队列,并设置 TTL:
private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
String retryQueueName = retryQueues.getQueueName(retryCount);
rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
MessageProperties props = m.getMessageProperties();
props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
props.setHeader("x-original-exchange", props.getReceivedExchange());
props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());
return m;
});
mac.channel.basicReject(mac.message.getMessageProperties().getDeliveryTag(), false);
}
4.6 主队列与消费者
主队列无需特殊配置:
@Bean
public Queue nonBlockingQueue() {
return QueueBuilder.nonDurable("non-blocking-queue").build();
}
消费者仍抛异常模拟失败,注意开启手动确认模式:
@RabbitListener(
queues = "non-blocking-queue",
containerFactory = "retryQueuesContainerFactory",
ackMode = "MANUAL"
)
public void consumeNonBlocking(String payload) throws Exception {
logger.info("Processing message from non-blocking-queue: {}", payload);
throw new Exception("Error occured!");
}
4.7 测试验证
发送两条消息:
@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
retryQueues.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
}
latch.await();
}
日志输出:
2020-02-19 10:31:40.640 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415 INFO : Processing message from non-blocking-queue: non blocking message 1
...
可以看到两条消息的重试是交错并发执行的,完全没有阻塞。
4.8 踩坑提醒 ⚠️
这种方案虽然高效,但有一个经典坑点:
❗ RabbitMQ 只有当消息到达队列头部时才会检查是否过期。如果一个长 TTL 的消息卡在队首,后面所有消息都无法被投递,即使它们早已过期。
✅ 解决方案:每个重试队列只能存放相同 TTL 的消息。也就是说,你应该为每一级重试间隔创建独立的队列(如 retry-1s、retry-3s、retry-10s),而不是混用。
5. 总结
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
✅ 阻塞式(Spring Retry) | 配置简单,逻辑清晰 | 消费者线程阻塞,高并发下易导致延迟 | 低频、非核心业务 |
✅ 非阻塞式(TTL + DLQ) | 完全异步,不阻塞消费者 | 实现复杂,需注意 TTL 队列隔离 | 高吞吐、核心链路 |
选择哪种方案,取决于你的系统对延迟、吞吐量和复杂度的权衡。
💡 小建议:生产环境推荐使用非阻塞方案,若觉得手动管理多个队列麻烦,也可以考虑集成 RabbitMQ Delayed Message Plugin,用一个延迟队列搞定所有重试。
完整代码已托管至 GitHub:https://github.com/eugenp/tutorials/tree/master/messaging-modules/spring-amqp