1. 引言
本文将深入探讨如何使用 Spring AMQP 和 RabbitMQ 实现 fanout 和 topic 两种交换机模式。核心区别在于:
- Fanout 交换机:向所有绑定队列广播相同消息,忽略路由键
- Topic 交换机:通过路由键将消息精准投递到特定队列
建议读者先掌握 Spring AMQP 基础 再阅读本文。
2. 配置 Fanout 交换机
我们创建一个 fanout 交换机并绑定两个队列。消息发送到此交换机时,所有绑定队列都会收到相同消息,路由键会被直接忽略。
Spring AMQP 提供了 Declarables
对象统一管理队列、交换机和绑定关系:
@Bean
public Declarables fanoutBindings() {
Queue fanoutQueue1 = new Queue("fanout.queue1", false);
Queue fanoutQueue2 = new Queue("fanout.queue2", false);
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
return new Declarables(
fanoutQueue1,
fanoutQueue2,
fanoutExchange,
BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}
3. 配置 Topic 交换机
现在配置 topic 交换机,绑定两个不同模式的队列:
@Bean
public Declarables topicBindings() {
Queue topicQueue1 = new Queue(topicQueue1Name, false);
Queue topicQueue2 = new Queue(topicQueue2Name, false);
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
return new Declarables(
topicQueue1,
topicQueue2,
topicExchange,
BindingBuilder
.bind(topicQueue1)
.to(topicExchange).with("*.important.*"),
BindingBuilder
.bind(topicQueue2)
.to(topicExchange).with("#.error"));
}
Topic 交换机的核心优势在于灵活的路由键匹配:
- 使用
*
匹配单个单词(如user.important.error
) - 使用
#
匹配零或多个单词(如blog.post.save.error
)
⚠️ 关键规则:
- 当消息路由键匹配队列绑定时才会投递
- 同一队列多个绑定匹配时,仅投递一份消息副本(避免重复)
绑定模式示例:
topicQueue1
接收三段式路由键且中间词为important
的消息(如user.important.error
)topicQueue2
接收以error
结尾的路由键(如error
或blog.post.save.error
)
4. 消息生产者配置
使用 RabbitTemplate
的 convertAndSend
方法发送消息:
String message = " payload is broadcast";
return args -> {
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN,
"topic important warn" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR,
"topic important error" + message);
};
✅ 发送要点:
- Fanout 交换机:路由键参数会被忽略
- Topic 交换机:必须提供有效的路由键
RabbitTemplate
提供了多种重载的 convertAndSend()
方法适配不同场景。
5. 消费者配置
通过 @RabbitListener
注解配置四个消费者(每个队列一个):
@RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
public void receiveMessageFromFanout1(String message) {
System.out.println("Received fanout 1 message: " + message);
}
@RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
public void receiveMessageFromFanout2(String message) {
System.out.println("Received fanout 2 message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
public void receiveMessageFromTopic1(String message) {
System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
public void receiveMessageFromTopic2(String message) {
System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
}
❌ 消费者注意事项:
- 只需指定队列名称,无需关心交换机或路由键
- 消费者与生产者完全解耦
6. 运行示例
本项目基于 Spring Boot,启动时会自动:
- 建立 RabbitMQ 连接
- 初始化所有队列、交换机和绑定
默认连接本地 5672 端口的 RabbitMQ,可在 application.yaml
中修改配置。
项目暴露了 HTTP 接口 /broadcast
,接收 POST 请求的消息体。发送请求体为 "Test" 时,典型输出如下:
Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast
⚠️ 重要提醒:消息接收顺序不保证,这是 RabbitMQ 的正常行为。
7. 总结
本文通过实战演示了 Spring AMQP 中 fanout 和 topic 交换机的核心用法:
- Fanout:简单粗暴的广播模式
- Topic:灵活精准的路由匹配
完整源码请参考 GitHub 仓库。