1. 概述
本文将探讨如何使用Spring AMQP框架实现基于AMQP协议的消息通信。首先介绍消息通信的核心概念,然后通过实际案例演示具体实现。
2. 基于消息的通信
消息通信是一种应用间交互技术,它采用异步消息传递机制,而非同步的请求-响应架构。消息生产者和消费者通过中间的消息代理层解耦。消息代理提供消息持久化、消息过滤和转换等特性。
在Java应用间通信时,通常使用JMS(Java消息服务)API。但跨平台和跨厂商互操作时,JMS无法胜任,这时AMQP就派上用场了。
3. AMQP——高级消息队列协议
AMQP是一个开放标准的二进制线级协议规范,用于异步消息通信,它定义了消息的构造方式。
3.1. AMQP与JMS的不同
AMQP作为平台无关的二进制协议标准,支持多语言库实现和跨环境运行。避免了JMS厂商锁定问题,迁移时无需重写客户端。常用AMQP代理包括RabbitMQ、OpenAMQ和StormMQ。更多细节可参考JMS vs AMQP和Understanding AMQP。
3.2. AMQP实体
AMQP核心由交换器(Exchange)、队列(Queue)和绑定(Binding)组成:
- 交换器:类似邮局,客户端将消息发布到交换器。内置四种类型:
- Direct Exchange - 通过完整匹配路由键分发消息
- Fanout Exchange - 广播到所有绑定队列
- Topic Exchange - 通过模式匹配路由键分发到多个队列
- Headers Exchange - 基于消息头路由
- 队列:通过路由键绑定到交换器
- 消息:携带路由键发送到交换器,交换器将消息副本分发到匹配队列
深入理解可参考AMQP Concepts和Routing Topologies。
3.3. Spring AMQP
Spring AMQP包含两个模块:spring-amqp
和spring-rabbit
,提供以下抽象:
- AMQP实体:通过
Message
、Queue
、Binding
、Exchange
类创建 - 连接管理:使用
CachingConnectionFactory
连接RabbitMQ - 消息发布:通过
RabbitTemplate
发送消息 - 消息消费:使用
@RabbitListener
注解监听队列
4. 设置RabbitMQ代理
使用Docker快速部署RabbitMQ代理:
docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management
- 端口5672:应用连接端口
- 端口15672:管理界面访问地址 http://localhost:15672 或HTTP API http://localhost:15672/api/index.html
5. 创建Spring AMQP应用
下面实现一个发送"Hello, world!"消息的简单应用。
5.1. Maven依赖
在pom.xml
中添加spring-boot-starter-amqp
依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
</dependencies>
最新版本可在Maven Central获取。
5.2. 连接RabbitMQ代理
**利用Spring Boot自动配置创建ConnectionFactory
、RabbitTemplate
和RabbitAdmin
**。默认使用guest账号连接本地5672端口:
@SpringBootApplication
public class HelloWorldMessageApp {
// ...
}
5.3. 创建队列
只需定义一个Queue
类型的Bean,RabbitAdmin
会自动将其绑定到默认交换器(路由键为队列名):
@Bean
public Queue myQueue() {
return new Queue("myQueue", false);
}
⚠️ 队列设置为非持久化,RabbitMQ停止时队列和消息会被清除。但重启应用不影响队列。
5.4. 发送消息
使用RabbitTemplate
发送消息:
rabbitTemplate.convertAndSend("myQueue", "Hello, world!");
5.5. 消费消息
通过@RabbitListener
注解实现消息消费:
@RabbitListener(queues = "myQueue")
public void listen(String in) {
System.out.println("Message read from myQueue : " + in);
}
6. 运行应用
启动RabbitMQ代理:
docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management
运行Spring Boot应用:
mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.simple.HelloWorldMessageApp
运行时会发生:
- 应用发送消息到默认交换器(路由键为"myQueue")
- 队列"myQueue"接收消息
listen
方法消费消息并打印到控制台
也可通过管理界面http://localhost:15672验证消息发送和消费情况。
7. 总结
本文介绍了使用Spring AMQP通过AMQP协议实现应用间消息通信的架构。完整源码可在GitHub项目获取。