1. 概述

本文将探讨如何使用Spring AMQP框架实现基于AMQP协议的消息通信。首先介绍消息通信的核心概念,然后通过实际案例演示具体实现。

2. 基于消息的通信

消息通信是一种应用间交互技术,它采用异步消息传递机制,而非同步的请求-响应架构。消息生产者和消费者通过中间的消息代理层解耦。消息代理提供消息持久化、消息过滤和转换等特性。

在Java应用间通信时,通常使用JMS(Java消息服务)API。但跨平台和跨厂商互操作时,JMS无法胜任,这时AMQP就派上用场了

3. AMQP——高级消息队列协议

AMQP是一个开放标准的二进制线级协议规范,用于异步消息通信,它定义了消息的构造方式。

3.1. AMQP与JMS的不同

AMQP作为平台无关的二进制协议标准,支持多语言库实现和跨环境运行。避免了JMS厂商锁定问题,迁移时无需重写客户端。常用AMQP代理包括RabbitMQ、OpenAMQ和StormMQ。更多细节可参考JMS vs AMQPUnderstanding AMQP

3.2. AMQP实体

AMQP核心由交换器(Exchange)、队列(Queue)和绑定(Binding)组成:

  • 交换器:类似邮局,客户端将消息发布到交换器。内置四种类型:
    • Direct Exchange - 通过完整匹配路由键分发消息
    • Fanout Exchange - 广播到所有绑定队列
    • Topic Exchange - 通过模式匹配路由键分发到多个队列
    • Headers Exchange - 基于消息头路由
  • 队列:通过路由键绑定到交换器
  • 消息:携带路由键发送到交换器,交换器将消息副本分发到匹配队列

深入理解可参考AMQP ConceptsRouting Topologies

3.3. Spring AMQP

Spring AMQP包含两个模块:spring-amqpspring-rabbit,提供以下抽象:

  • AMQP实体:通过MessageQueueBindingExchange类创建
  • 连接管理:使用CachingConnectionFactory连接RabbitMQ
  • 消息发布:通过RabbitTemplate发送消息
  • 消息消费:使用@RabbitListener注解监听队列

4. 设置RabbitMQ代理

使用Docker快速部署RabbitMQ代理:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

5. 创建Spring AMQP应用

下面实现一个发送"Hello, world!"消息的简单应用。

5.1. Maven依赖

pom.xml中添加spring-boot-starter-amqp依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.2.RELEASE</version>
    </dependency>
</dependencies>

最新版本可在Maven Central获取。

5.2. 连接RabbitMQ代理

**利用Spring Boot自动配置创建ConnectionFactoryRabbitTemplateRabbitAdmin**。默认使用guest账号连接本地5672端口:

@SpringBootApplication
public class HelloWorldMessageApp {
   // ...
}

5.3. 创建队列

只需定义一个Queue类型的BeanRabbitAdmin会自动将其绑定到默认交换器(路由键为队列名):

@Bean
public Queue myQueue() {
    return new Queue("myQueue", false);
}

⚠️ 队列设置为非持久化,RabbitMQ停止时队列和消息会被清除。但重启应用不影响队列。

5.4. 发送消息

使用RabbitTemplate发送消息:

rabbitTemplate.convertAndSend("myQueue", "Hello, world!");

5.5. 消费消息

通过@RabbitListener注解实现消息消费:

@RabbitListener(queues = "myQueue")
public void listen(String in) {
    System.out.println("Message read from myQueue : " + in);
}

6. 运行应用

  1. 启动RabbitMQ代理:

    docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management
    
  2. 运行Spring Boot应用:

    mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.simple.HelloWorldMessageApp
    

运行时会发生:

  1. 应用发送消息到默认交换器(路由键为"myQueue")
  2. 队列"myQueue"接收消息
  3. listen方法消费消息并打印到控制台

也可通过管理界面http://localhost:15672验证消息发送和消费情况。

7. 总结

本文介绍了使用Spring AMQP通过AMQP协议实现应用间消息通信的架构。完整源码可在GitHub项目获取。