1. 概述
本教程将介绍如何使用 Java JMS(Java Message Service)从 IBM MQ 队列中读取和写入消息。
2. 环境配置
为了避免手动安装和配置的复杂性,我们可以在 Docker 容器中运行 IBM MQ。使用以下命令启动一个基本配置的容器:
docker run -d --name my-mq -e LICENSE=accept -e MQ_QMGR_NAME=QM1 MQ_QUEUE_NAME=QUEUE1 -p 1414:1414 -p 9443:9443 ibmcom/mq
接下来,我们需要在 pom.xml 文件中添加 IBM MQ 客户端 依赖:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.4.0.0</version>
</dependency>
3. 配置 JMS 连接
首先,我们需要使用 QueueConnectionFactory 设置 JMS 连接,该工厂用于创建到队列管理器的连接:
public class JMSSetup {
public QueueConnectionFactory createConnectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName("localhost");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("SYSTEM.DEF.SVRCONN");
return factory;
}
}
我们首先创建 MQQueueConnectionFactory 实例,用于配置和创建到 IBM MQ 服务器的连接。*我们将 hostname 设置为 localhost,因为 MQ 服务器在 Docker 容器内本地运行。* 端口 1414 从 Docker 容器映射到主机。
然后我们使用默认通道 SYSTEM.DEF.SVRCONN。这是 IBM MQ 客户端连接的常用通道。
4. 向 IBM MQ 队列写入消息
本节将介绍向 IBM MQ 队列发送消息的过程。
4.1. 建立 JMS 连接
首先,我们创建 MessageSender 类。该类负责设置到 IBM MQ 服务器的连接、管理会话和处理消息发送操作。我们声明 QueueConnectionFactory、QueueConnection、QueueSession 和 QueueSender 的实例变量,用于与 IBM MQ 服务器交互。
以下是 IBM MQ 连接设置、会话创建和消息发送的示例实现:
public class MessageSender {
private QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
private QueueSender sender;
public MessageSender() throws JMSException {
factory = new JMSSetup().createConnectionFactory();
connection = factory.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("QUEUE1");
sender = session.createSender(queue);
connection.start();
}
// ...
}
接下来,在 MessageSender 的构造函数中,我们使用 JMSSetup 类初始化 QueueConnectionFactory。*然后使用这个 factory 创建 QueueConnection。* 此连接允许我们与 IBM MQ 服务器交互。
连接建立后,我们使用 createQueueSession() 创建 QueueSession。该会话允许我们与队列通信。这里我们传递 false 表示会话是非事务性的,Session.AUTO_ACKNOWLEDGE 表示消息接收时自动确认。
之后,我们定义特定队列"QUEUE1"并创建 QueueSender 来处理消息发送。最后,我们启动连接以确保会话处于活动状态并准备传输消息。
4.2. 写入文本消息
既然已经建立了连接、创建了会话、定义了队列并创建了消息生产者,我们就可以向队列发送文本消息了:
public void sendMessage(String messageText) {
try {
TextMessage message = session.createTextMessage();
message.setText(messageText);
sender.send(message);
} catch (JMSException e) {
// 处理异常
} finally {
// 关闭资源
}
}
首先,我们创建一个 sendMessage() 方法,该方法接受 messageText 参数。sendMessage() 方法负责向队列发送文本消息。* 它创建 TextMessage 对象并使用 setText() 方法设置消息内容。
接下来,我们使用 QueueSender 对象的 send() 方法将消息发送到定义的队列。这种设计允许高效的消息传输,因为连接和会话在 MessageSender 对象存在期间保持打开状态。
4.3. 消息类型
除了 TextMessage,IBM MQ 还支持多种其他消息类型,以满足不同的使用场景。例如,我们可以发送以下类型的消息:
- BytesMessage:包含以字节形式存储的原始二进制数据的消息。
- ObjectMessage:携带序列化 Java 对象的消息。
- MapMessage:包含键值对的消息。
- StreamMessage:包含原始数据类型流的消息。
5. 从 IBM MQ 队列读取消息
既然我们已经向队列发送了消息,现在来探讨如何从队列中读取消息。
5.1. 建立 JMS 连接并创建会话
首先,我们需要建立连接并创建会话,这与发送消息时的操作类似。我们首先创建 MessageReceiver 类。该类处理到 IBM MQ 服务器的连接并设置消息消费所需的组件:
public class MessageReceiver {
private QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
private QueueReceiver receiver;
public MessageReceiver() throws JMSException {
factory = new JMSSetup().createConnectionFactory();
connection = factory.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("QUEUE1");
receiver = session.createReceiver(queue);
connection.start();
}
// ...
}
在这个类中,我们首先创建 QueueConnectionFactory 来设置到 IBM MQ 服务器的连接。*然后使用此连接创建 QueueSession,这允许我们与队列交互。*
最后,我们定义特定队列"QUEUE1"并创建 QueueReceiver 来处理来自队列的传入消息。
5.2. 读取文本消息
连接、会话和接收器设置完成后,我们就可以开始从队列接收消息了。我们使用 QueueReceiver 的 receive() 方法从指定队列中拉取消息:
public void receiveMessage() {
try {
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
} else {
// ...
}
} catch (JMSException e) {
// 处理异常
} finally {
// 关闭资源
}
}
在 receiveMessage() 方法中,我们使用 receive() 函数等待来自队列的消息,超时时间为 1000 毫秒。接收到消息后,我们检查它是否为 TextMessage 类型。
如果是,我们可以使用 getText() 方法检索实际的消息内容,该方法以字符串形式返回文本内容。
6. 消息属性和头信息
本节将讨论一些在发送或接收消息时常用的消息属性和头信息。
6.1. 消息属性
消息属性可用于存储和检索消息正文之外的附加信息。这对于过滤消息或向消息添加上下文数据特别有用。 以下是在发送消息时设置自定义属性的方法:
TextMessage message = session.createTextMessage();
message.setText(messageText);
message.setStringProperty("OrderID", "12345");
接下来,我们可以在接收消息时检索该属性:
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String orderID = message.getStringProperty("OrderID");
}
6.2. 消息头信息
消息头信息提供包含消息元数据的预定义字段。 一些常用的消息头信息包括:
- JMSMessageID:由 JMS 提供者为每条消息分配的唯一标识符。我们可以使用此 ID 来跟踪和记录消息。
- JMSExpiration:定义消息过期时间(毫秒)。如果消息在此时间内未送达,将被丢弃。
- JMSTimestamp:消息发送的时间。
- JMSPriority:消息的优先级。
让我们看看如何在接收消息时检索消息头信息:
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String messageId = message.getJMSMessageID();
long timestamp = message.getJMSTimestamp();
long expiration = message.getJMSExpiration();
int priority = message.getJMSPriority();
}
7. 使用 Mockito 进行模拟测试
本节将使用 Mockito 来模拟依赖项并验证 MessageSender 和 MessageReceiver 类的交互。我们首先使用 @Mock 注解创建依赖项的模拟实例。*
接下来,我们验证 sendMessage() 方法正确地与模拟的 QueueSender 交互。我们模拟 QueueSender 的 send() 方法,并验证 TextMessage 是否正确创建:
String messageText = "Hello Baeldung! Nice to meet you!";
doNothing().when(sender).send(any(TextMessage.class));
messageSender.sendMessage(messageText);
verify(sender).send(any(TextMessage.class));
verify(textMessage).setText(messageText);
最后,我们验证 receiveMessage() 方法正确地与模拟的 QueueReceiver 交互。我们模拟 receive() 方法以返回预定义的 TextMessage,并按预期检索消息文本:
when(receiver.receive(anyLong())).thenReturn(textMessage);
when(textMessage.getText()).thenReturn("Hello Baeldung! Nice to meet you!");
messageReceiver.receiveMessage();
verify(textMessage).getText();
8. 总结
本文探讨了设置 JMS 连接、会话和消息生产者/接收器以与 IBM MQ 队列交互的过程。我们还介绍了 IBM MQ 支持的几种消息类型。此外,我们强调了自定义属性和头信息如何增强消息处理。