1. 引言

在 Spring AMQP 的默认行为中,当一条消息消费失败时,它会被重新放回队列,等待下一次消费。这看似合理,但如果处理不当,很容易陷入无限重试的死循环,不仅浪费系统资源,还可能导致服务雪崩。

虽然使用 死信队列(Dead Letter Queue) 是处理失败消息的标准做法,但在很多场景下我们更希望先尝试几次自动重试,让系统有机会自我恢复,而不是直接丢进死信队列。

本文将介绍两种实现 指数退避(Exponential Backoff) 重试策略的方式:

✅ 一种是基于 Spring Retry 的同步阻塞式重试
✅ 另一种是利用 TTL + 死信队列的异步非阻塞式重试

这两种方式各有优劣,适用于不同场景,我们一一来看。


2. 前置准备

本文使用 RabbitMQ 作为 AMQP 的实现。如果你还没搭好环境,可以用下面这条 Docker 命令快速启动一个带管理界面的实例:

docker run -p 5672:5672 -p 15672:15672 --name rabbitmq-server rabbitmq:3-management

访问 http://localhost:15672(默认账号密码均为 guest)即可查看管理后台。

项目依赖只需引入 spring-boot-starter-amqp

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.4.RELEASE</version>
    </dependency>
</dependencies>

后续示例基于 Spring Boot 构建,配置类和测试类均省略部分样板代码,只保留核心逻辑。


3. 阻塞式重试(Blocking Retry)

这种方式利用 Spring Retry 模块,在消费者内部进行重试。每次失败后按指数增长的间隔等待,直到达到最大重试次数。

3.1 队列定义

先定义一个简单的临时队列:

@Bean
public Queue blockingQueue() {
    return QueueBuilder.nonDurable("blocking-queue").build();
}

3.2 配置指数退避策略

通过 RetryInterceptorBuilder 设置重试参数:

  • 初始间隔:1000ms
  • 指数因子:3.0
  • 最大等待时间:10000ms
  • 最大尝试次数:5 次
@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
      .backOffOptions(1000, 3.0, 10000) // initial, multiplier, max
      .maxAttempts(5)
      .recoverer(observableRecoverer()) // 达到最大重试次数后的兜底处理
      .build();
}

然后将该拦截器注入到自定义的 Listener 容器工厂中:

@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
  ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(new Advice[]{retryInterceptor});
    return factory;
}

3.3 消费者实现

消费者故意抛出异常模拟失败:

@RabbitListener(
    queues = "blocking-queue", 
    containerFactory = "retryContainerFactory"
)
public void consumeBlocking(String payload) throws Exception {
    logger.info("Processing message from blocking-queue: {}", payload);
    throw new Exception("exception occured!");
}

3.4 测试验证

发送两条消息并观察日志:

@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
    int nb = 2;
    CountDownLatch latch = new CountDownLatch(nb);
    observableRecoverer.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
    }

    latch.await();
}

日志输出如下:

2020-02-18 21:17:55.638  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875  ERROR : java.lang.Exception: exception occured!
...

可以看到重试间隔为:1s → 3s → 9s → 超过 10s 后按上限 10s 执行,符合指数增长规律。

3.5 踩坑提醒 ⚠️

这种方式虽然简单粗暴,但有一个致命问题:重试期间消费者线程被完全阻塞

即使你设置了 concurrency = "2",每个重试中的消息仍会独占一个消费者线程。在高并发场景下极易造成消费者资源耗尽,导致其他正常消息延迟处理。

✅ 改进建议:仅适用于低频、非关键业务场景,或作为兜底方案。


4. 非阻塞式重试(Non-blocking Retry)

为了解决阻塞问题,我们可以换一种思路:利用消息的 TTL(Time-To-Live)和死信机制来实现异步重试

核心思想是:

  • 每次失败不立即重试,而是把消息发到一个带有 TTL 的“重试队列”
  • 当消息过期后,自动进入死信队列
  • 死信队列的消费者再把消息扔回原始队列,形成一次“重试”

这样整个过程是异步的,不会阻塞原始消费者。

4.1 死信队列定义

先定义一个统一的“重试结束”死信队列:

@Bean
public Queue retryWaitEndedQueue() {
    return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}

4.2 死信消费者:负责“重投”

这个消费者只有一个职责:把过期的消息重新发回原始队列。

@RabbitListener(
    queues = "retry-wait-ended-queue", 
    containerFactory = "defaultContainerFactory"
)
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception {
    MessageProperties props = message.getMessageProperties();
    
    // 从 header 中取出原始 exchange 和 routing key
    String originalExchange = props.getHeader("x-original-exchange");
    String originalRoutingKey = props.getHeader("x-original-routing-key");
    
    rabbitTemplate.convertAndSend(originalExchange, originalRoutingKey, message);
}

4.3 定义重试队列

创建多个重试队列(这里以 3 次为例),每个队列没有持久化,且死信指向上面的 retry-wait-ended-queue

@Bean
public Queue retryQueue1() {
    return QueueBuilder.nonDurable("retry-queue-1")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue2() {
    return QueueBuilder.nonDurable("retry-queue-2")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue3() {
    return QueueBuilder.nonDurable("retry-queue-3")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

4.4 封装指数退避逻辑

用一个 RetryQueues 类管理重试配置:

public class RetryQueues {
    private Queue[] queues;
    private long initialInterval;
    private double factor;
    private long maxWait;

    // 构造函数、getter/setter 略
}

注册 Bean 时传入参数:

@Bean
public RetryQueues retryQueues() {
    return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}

4.5 自定义拦截器实现重试跳转

关键逻辑在 RetryQueuesInterceptor 中:

public class RetryQueuesInterceptor implements MethodInterceptor {

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
            try {
                int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
                sendToNextRetryQueue(messageAndChannel, retryCount);
            } catch (Throwable t) {
                throw new RuntimeException(t);
            }
        });
    }
}

失败时调用 sendToNextRetryQueue,将消息发往下一个重试队列,并设置 TTL:

private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
    String retryQueueName = retryQueues.getQueueName(retryCount);

    rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
        MessageProperties props = m.getMessageProperties();
        props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
        props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
        props.setHeader("x-original-exchange", props.getReceivedExchange());
        props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());
        return m;
    });

    mac.channel.basicReject(mac.message.getMessageProperties().getDeliveryTag(), false);
}

4.6 主队列与消费者

主队列无需特殊配置:

@Bean
public Queue nonBlockingQueue() {
    return QueueBuilder.nonDurable("non-blocking-queue").build();
}

消费者仍抛异常模拟失败,注意开启手动确认模式:

@RabbitListener(
    queues = "non-blocking-queue", 
    containerFactory = "retryQueuesContainerFactory", 
    ackMode = "MANUAL"
)
public void consumeNonBlocking(String payload) throws Exception {
    logger.info("Processing message from non-blocking-queue: {}", payload);
    throw new Exception("Error occured!");
}

4.7 测试验证

发送两条消息:

@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
    int nb = 2;
    CountDownLatch latch = new CountDownLatch(nb);
    retryQueues.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
    }

    latch.await();
}

日志输出:

2020-02-19 10:31:40.640  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415  INFO : Processing message from non-blocking-queue: non blocking message 1
...

可以看到两条消息的重试是交错并发执行的,完全没有阻塞。

4.8 踩坑提醒 ⚠️

这种方案虽然高效,但有一个经典坑点:

❗ RabbitMQ 只有当消息到达队列头部时才会检查是否过期。如果一个长 TTL 的消息卡在队首,后面所有消息都无法被投递,即使它们早已过期。

✅ 解决方案:每个重试队列只能存放相同 TTL 的消息。也就是说,你应该为每一级重试间隔创建独立的队列(如 retry-1s、retry-3s、retry-10s),而不是混用。


5. 总结

方案 优点 缺点 适用场景
✅ 阻塞式(Spring Retry) 配置简单,逻辑清晰 消费者线程阻塞,高并发下易导致延迟 低频、非核心业务
✅ 非阻塞式(TTL + DLQ) 完全异步,不阻塞消费者 实现复杂,需注意 TTL 队列隔离 高吞吐、核心链路

选择哪种方案,取决于你的系统对延迟、吞吐量和复杂度的权衡。

💡 小建议:生产环境推荐使用非阻塞方案,若觉得手动管理多个队列麻烦,也可以考虑集成 RabbitMQ Delayed Message Plugin,用一个延迟队列搞定所有重试。

完整代码已托管至 GitHub:https://github.com/eugenp/tutorials/tree/master/messaging-modules/spring-amqp


原始标题:Exponential Backoff With Spring AMQP