1. 引言

本文将深入探讨如何使用 Spring AMQP 和 RabbitMQ 实现 fanouttopic 两种交换机模式。核心区别在于:

  • Fanout 交换机:向所有绑定队列广播相同消息,忽略路由键
  • Topic 交换机:通过路由键将消息精准投递到特定队列

建议读者先掌握 Spring AMQP 基础 再阅读本文。

2. 配置 Fanout 交换机

我们创建一个 fanout 交换机并绑定两个队列。消息发送到此交换机时,所有绑定队列都会收到相同消息,路由键会被直接忽略

Spring AMQP 提供了 Declarables 对象统一管理队列、交换机和绑定关系:

@Bean
public Declarables fanoutBindings() {
    Queue fanoutQueue1 = new Queue("fanout.queue1", false);
    Queue fanoutQueue2 = new Queue("fanout.queue2", false);
    FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");

    return new Declarables(
      fanoutQueue1,
      fanoutQueue2,
      fanoutExchange,
      BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
      BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}

3. 配置 Topic 交换机

现在配置 topic 交换机,绑定两个不同模式的队列:

@Bean
public Declarables topicBindings() {
    Queue topicQueue1 = new Queue(topicQueue1Name, false);
    Queue topicQueue2 = new Queue(topicQueue2Name, false);

    TopicExchange topicExchange = new TopicExchange(topicExchangeName);

    return new Declarables(
      topicQueue1,
      topicQueue2,
      topicExchange,
      BindingBuilder
        .bind(topicQueue1)
        .to(topicExchange).with("*.important.*"),
      BindingBuilder
        .bind(topicQueue2)
        .to(topicExchange).with("#.error"));
}

Topic 交换机的核心优势在于灵活的路由键匹配

  • 使用 * 匹配单个单词(如 user.important.error
  • 使用 # 匹配零或多个单词(如 blog.post.save.error

⚠️ 关键规则

  • 当消息路由键匹配队列绑定时才会投递
  • 同一队列多个绑定匹配时,仅投递一份消息副本(避免重复)

绑定模式示例:

  • topicQueue1 接收三段式路由键且中间词为 important 的消息(如 user.important.error
  • topicQueue2 接收以 error 结尾的路由键(如 errorblog.post.save.error

4. 消息生产者配置

使用 RabbitTemplateconvertAndSend 方法发送消息:

    String message = " payload is broadcast";
    return args -> {
        rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, 
            "topic important warn" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, 
            "topic important error" + message);
    };

发送要点

  • Fanout 交换机:路由键参数会被忽略
  • Topic 交换机:必须提供有效的路由键

RabbitTemplate 提供了多种重载的 convertAndSend() 方法适配不同场景。

5. 消费者配置

通过 @RabbitListener 注解配置四个消费者(每个队列一个):

    @RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
    public void receiveMessageFromFanout1(String message) {
        System.out.println("Received fanout 1 message: " + message);
    }

    @RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
    public void receiveMessageFromFanout2(String message) {
        System.out.println("Received fanout 2 message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
    public void receiveMessageFromTopic1(String message) {
        System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
    public void receiveMessageFromTopic2(String message) {
        System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
    }

消费者注意事项

  • 只需指定队列名称,无需关心交换机或路由键
  • 消费者与生产者完全解耦

6. 运行示例

本项目基于 Spring Boot,启动时会自动:

  1. 建立 RabbitMQ 连接
  2. 初始化所有队列、交换机和绑定

默认连接本地 5672 端口的 RabbitMQ,可在 application.yaml 中修改配置。

项目暴露了 HTTP 接口 /broadcast,接收 POST 请求的消息体。发送请求体为 "Test" 时,典型输出如下:

Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast

⚠️ 重要提醒:消息接收顺序不保证,这是 RabbitMQ 的正常行为。

7. 总结

本文通过实战演示了 Spring AMQP 中 fanout 和 topic 交换机的核心用法:

  • Fanout:简单粗暴的广播模式
  • Topic:灵活精准的路由匹配

完整源码请参考 GitHub 仓库


原始标题:RabbitMQ Message Dispatching with Spring AMQP