1. 概述

本教程将学习如何通过发布者确认确保消息成功投递到RabbitMQ代理服务器,以及如何通过消费者确认告知代理服务器消息已被成功处理。

2. 应用场景

在简单应用中,我们使用RabbitMQ时常常忽略显式确认机制,仅依赖基础的消息发布和自动确认。但即使RabbitMQ架构健壮,错误仍可能发生,这就需要一种机制来双重验证消息是否送达代理服务器,以及确认消息是否被成功处理。发布者确认和消费者确认正是为此提供的安全保障。

3. 等待发布者确认

即使应用本身没有错误,已发布的消息仍可能丢失。例如,网络故障可能导致消息在传输过程中丢失。为避免这种情况,AMQP提供了事务语义来保证消息不丢失。但代价显著:事务开销大,消息处理时间会大幅增加,尤其在高吞吐量场景下。

更优方案是采用确认模式。虽然会引入一定开销,但比事务快得多。该模式指示客户端与代理服务器启动消息计数机制,客户端随后通过代理服务器返回的投递标签验证计数。这确保了消息被安全存储,等待后续分发给消费者。

启用确认模式需在通道上调用一次:

channel.confirmSelect();

确认过程可能耗时,尤其对持久化队列(存在IO延迟)。因此RabbitMQ异步等待确认,但提供同步方法供应用使用:

  • Channel.waitForConfirms() — 阻塞执行,直到自上次调用后所有消息被代理服务器ACK(确认)或NACK(拒绝)
  • Channel.waitForConfirms(timeout) — 同上,但可设置毫秒级超时,否则抛出TimeoutException
  • Channel.waitForConfirmsOrDie() — 若自上次调用后有消息被NACK则抛出异常(适用于零容忍场景)
  • Channel.waitForConfirmsOrDie(timeout) — 同上,带超时设置

3.1. 发布者设置

创建常规消息发布类,仅接收通道队列参数:

class UuidPublisher {
    private Channel channel;
    private String queue;

    public UuidPublisher(Channel channel, String queue) {
        this.channel = channel;
        this.queue = queue;
    }
}

添加发布String消息的方法:

public void send(String message) throws IOException {
    channel.basicPublish("", queue, null, message.getBytes());
}

此方式存在传输丢失风险,需增加代码确保代理服务器安全接收消息。

3.2. 启用通道确认模式

修改构造函数,在末尾调用confirmSelect()

public UuidPublisher(Channel channel, String queue) throws IOException {
    // ...

    this.channel.confirmSelect();
}

**未启用确认模式就调用等待方法会抛出IllegalStateException**。选择同步wait()方法,在send()后调用。使用带超时的版本避免无限等待:

public boolean send(String message) throws Exception {
    channel.basicPublish("", queue, null, message.getBytes());
    return channel.waitForConfirms(1000);
}

返回true表示代理服务器成功接收消息。适用于少量消息发送场景。

3.3. 批量确认发布消息

确认过程耗时,不应每条消息都等待确认。应批量发送后统一确认。修改方法接收消息列表:

public void sendAllOrDie(List<String> messages) throws Exception {
    for (String message : messages) {
        channel.basicPublish("", queue, null, message.getBytes());
    }

    channel.waitForConfirmsOrDie(1000);
}

使用waitForConfirmsOrDie()因为waitForConfirms()返回false时无法确定NACK的消息数量。此方法确保任何消息被NACK时抛出异常,但无法定位具体失败消息。

4. 利用确认模式保障批量发布

确认模式下,可在通道上注册ConfirmListener。该监听器包含两个回调:成功投递处理和代理服务器失败处理。实现机制确保无消息遗漏。添加监听器方法:

private void createConfirmListener() {
    this.channel.addConfirmListener(
      (tag, multiple) -> {
        // ...
      }, 
      (tag, multiple) -> {
        // ...
      }
    );
}

回调中tag是消息的顺序投递标签,multiple指示是否批量确认。若multipletruetag指向最新确认的标签;若为NACK,则所有大于该标签的消息也被确认。

协调回调需用ConcurrentSkipListMap存储未确认消息

private ConcurrentNavigableMap<Long, PendingMessage> pendingDelivery = new ConcurrentSkipListMap<>();

成功回调会移除所有小于等于tag的消息:

(tag, multiple) -> {
    ConcurrentNavigableMap<Long, PendingMessage> confirmed = pendingDelivery.headMap(tag, true);
    confirmed.clear();
}

headMap()multiplefalse时返回单个元素,否则返回多个。无需显式检查批量确认场景。

4.1. 实现拒绝消息重试机制

为拒绝回调实现重试机制,并设置最大重试次数避免无限重试。创建消息尝试计数类:

public class PendingMessage {
    private int tries;
    private String body;

    public PendingMessage(String body) {
        this.body = body;
    }

    public int incrementTries() {
        return ++this.tries;
    }

    // 标准getter方法
}

实现拒绝回调。**先获取失败消息视图,移除超过最大重试次数的消息**:

(tag, multiple) -> {
    ConcurrentNavigableMap<Long, PendingMessage> failed = pendingDelivery.headMap(tag, true);

    failed.values().removeIf(pending -> {
        return pending.incrementTries() >= MAX_TRIES;
    });

    // ...
}

若仍有待处理消息,重新发送。应用异常时移除消息:

if (!pendingDelivery.isEmpty()) {
    pendingDelivery.values().removeIf(message -> {
        try {
            channel.basicPublish("", queue, null, message.getBody().getBytes());
            return false;
        } catch (IOException e) {
            return true;
        }
    });
}

4.2. 整合实现

创建批量发送方法,能检测拒绝消息并重试。需调用getNextPublishSeqNo()获取消息标签

public void sendOrRetry(List<String> messages) throws IOException {
    createConfirmListener();

    for (String message : messages) {
        long tag = channel.getNextPublishSeqNo();
        pendingDelivery.put(tag, new PendingMessage(message));

        channel.basicPublish("", queue, null, message.getBytes());
    }
}

发布消息前创建监听器,否则无法接收确认。这将形成回调循环,直到所有消息成功发送或重试完毕。

5. 发送消费者投递确认

先看无手动确认的示例。自动确认模式下,消息一旦投递给消费者即视为成功传递:

public class UuidConsumer {
    private String queue;
    private Channel channel;

    // 全参构造函数

    public void consume() throws IOException {
        channel.basicConsume(queue, true, (consumerTag, delivery) -> {
            // 处理逻辑...
        }, cancelledTag -> {
            // 日志记录...
        });
    }
}

通过basicConsume()autoAck参数传true启用自动确认。虽简单高效但不安全,代理服务器在消息处理前就将其丢弃。更安全的方案是禁用自动确认,处理完成后通过basicAck()手动确认:

channel.basicConsume(queue, false, (consumerTag, delivery) -> {
    long deliveryTag = delivery.getEnvelope().getDeliveryTag();

    // 处理逻辑...

    channel.basicAck(deliveryTag, false);
}, cancelledTag -> {
    // 日志记录...
});

最简形式是逐条确认。使用接收到的投递标签确认消费。关键点:basicAck()需传false表示单条确认。性能较差,需优化。

5.1. 设置通道基础QoS

RabbitMQ默认在消息可用时立即推送。通过设置基础服务质量避免此问题。构造函数添加batchSize参数并调用basicQos()

public class UuidConsumer {
    // ...
    private int batchSize;

    public UuidConsumer(Channel channel, String queue, int batchSize) throws IOException {
        // ...

        this.batchSize = batchSize;
        channel.basicQos(batchSize);
    }
}

这确保其他消费者在处理期间仍能获取消息

5.2. 定义确认策略

改为批量确认提升性能。添加简单处理方法:将消息解析为UUID则视为处理成功:

private boolean process(String message) {
    try {
        UUID.fromString(message);
        return true;
    } catch (IllegalArgumentException e) {
        return false;
    }
}

修改consume()方法实现批量确认框架:

channel.basicConsume(queue, false, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    long deliveryTag = delivery.getEnvelope().getDeliveryTag();

    if (!process(message)) {
        // ...
    } else if (deliveryTag % batchSize == 0) {
        // ...
    } else {
        // ...
    }
}

处理失败则NACK;达到批量大小则ACK;否则暂存标签。用类变量存储待确认标签:

private AtomicLong pendingTag = new AtomicLong();

5.3. 拒绝消息

无法处理的消息需拒绝,可选择重新入队。重入队适用于过载场景,让其他消费者处理。提供两种方法:

  • channel.basicReject(deliveryTag, requeue) — 拒绝单条消息,可重入队或丢弃
  • channel.basicNack(deliveryTag, multiple, requeue) — 同上,支持批量拒绝multipletrue时拒绝自上次ACK后的所有消息)

因单条拒绝,使用第一种方法。若有待确认ACK则先发送并重置变量,再拒绝消息:

if (!process(message, deliveryTag)) {
    if (pendingTag.get() != 0) {
        channel.basicAck(pendingTag.get(), true);
        pendingTag.set(0);
    }

    channel.basicReject(deliveryTag, false);
}

5.4. 批量确认消息

投递标签顺序递增,可用取模运算检测是否达到批量大小。达到则发送ACK并重置pendingTag关键:multiple参数传true告知代理服务器当前标签及之前所有消息已处理

else if (deliveryTag % batchSize == 0) {
    channel.basicAck(deliveryTag, true);
    pendingTag.set(0);
} else {
    pendingTag.set(deliveryTag);
}

否则仅更新pendingTag。重复确认同一标签会触发RabbitMQ的PRECONDITION_FAILED – unknown delivery tag错误。

⚠️ 注意:使用multiple标志时,需考虑无更多消息导致无法达到批量大小的情况。解决方案是添加监控线程定期检查待确认ACK。

6. 总结

本文探讨了RabbitMQ中发布者确认和消费者确认的核心功能,这些机制对分布式系统的数据安全性和健壮性至关重要。

发布者确认可验证消息成功传输到RabbitMQ代理服务器,降低消息丢失风险;消费者确认通过确认消息消费实现可控且弹性的消息处理。

通过实际代码示例,我们展示了如何有效实现这些功能,为构建可靠的消息系统奠定基础。

源代码可在GitHub获取。


原始标题:Consumer Acknowledgments and Publisher Confirms with RabbitMQ | Baeldung