1. 概述

本教程将介绍如何使用 Java JMS(Java Message Service)从 IBM MQ 队列中读取和写入消息。

2. 环境配置

为了避免手动安装和配置的复杂性,我们可以在 Docker 容器中运行 IBM MQ。使用以下命令启动一个基本配置的容器:

docker run -d --name my-mq -e LICENSE=accept -e MQ_QMGR_NAME=QM1 MQ_QUEUE_NAME=QUEUE1 -p 1414:1414 -p 9443:9443 ibmcom/mq

接下来,我们需要在 pom.xml 文件中添加 IBM MQ 客户端 依赖:

<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.4.0.0</version>
</dependency>

3. 配置 JMS 连接

首先,我们需要使用 QueueConnectionFactory 设置 JMS 连接,该工厂用于创建到队列管理器的连接:

public class JMSSetup {
    public QueueConnectionFactory createConnectionFactory() throws JMSException {
        MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
        factory.setHostName("localhost");
        factory.setPort(1414);
        factory.setQueueManager("QM1");
        factory.setChannel("SYSTEM.DEF.SVRCONN"); 
        
        return factory;
    }
}

我们首先创建 MQQueueConnectionFactory 实例,用于配置和创建到 IBM MQ 服务器的连接。*我们将 hostname 设置为 localhost,因为 MQ 服务器在 Docker 容器内本地运行。* 端口 1414 从 Docker 容器映射到主机。

然后我们使用默认通道 SYSTEM.DEF.SVRCONN这是 IBM MQ 客户端连接的常用通道。

4. 向 IBM MQ 队列写入消息

本节将介绍向 IBM MQ 队列发送消息的过程。

4.1. 建立 JMS 连接

首先,我们创建 MessageSender 类。该类负责设置到 IBM MQ 服务器的连接、管理会话和处理消息发送操作。我们声明 QueueConnectionFactoryQueueConnectionQueueSessionQueueSender 的实例变量,用于与 IBM MQ 服务器交互。

以下是 IBM MQ 连接设置、会话创建和消息发送的示例实现:

public class MessageSender {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueSender sender;

    public MessageSender() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        sender = session.createSender(queue);
        connection.start();
    }

    // ...
}

接下来,在 MessageSender 的构造函数中,我们使用 JMSSetup 类初始化 QueueConnectionFactory。*然后使用这个 factory 创建 QueueConnection* 此连接允许我们与 IBM MQ 服务器交互。

连接建立后,我们使用 createQueueSession() 创建 QueueSession。该会话允许我们与队列通信。这里我们传递 false 表示会话是非事务性的,Session.AUTO_ACKNOWLEDGE 表示消息接收时自动确认。

之后,我们定义特定队列"QUEUE1"并创建 QueueSender 来处理消息发送。最后,我们启动连接以确保会话处于活动状态并准备传输消息。

4.2. 写入文本消息

既然已经建立了连接、创建了会话、定义了队列并创建了消息生产者,我们就可以向队列发送文本消息了:

public void sendMessage(String messageText) {
    try {
        TextMessage message = session.createTextMessage();
        message.setText(messageText);
        sender.send(message);
    } catch (JMSException e) {
        // 处理异常
    } finally {
        // 关闭资源
    }
}

首先,我们创建一个 sendMessage() 方法,该方法接受 messageText 参数。sendMessage() 方法负责向队列发送文本消息。* 它创建 TextMessage 对象并使用 setText() 方法设置消息内容。

接下来,我们使用 QueueSender 对象的 send() 方法将消息发送到定义的队列。这种设计允许高效的消息传输,因为连接和会话在 MessageSender 对象存在期间保持打开状态。

4.3. 消息类型

除了 TextMessage,IBM MQ 还支持多种其他消息类型,以满足不同的使用场景。例如,我们可以发送以下类型的消息:

  • BytesMessage:包含以字节形式存储的原始二进制数据的消息。
  • ObjectMessage:携带序列化 Java 对象的消息。
  • MapMessage:包含键值对的消息。
  • StreamMessage:包含原始数据类型流的消息。

5. 从 IBM MQ 队列读取消息

既然我们已经向队列发送了消息,现在来探讨如何从队列中读取消息。

5.1. 建立 JMS 连接并创建会话

首先,我们需要建立连接并创建会话,这与发送消息时的操作类似。我们首先创建 MessageReceiver 类。该类处理到 IBM MQ 服务器的连接并设置消息消费所需的组件

public class MessageReceiver {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueReceiver receiver;

    public MessageReceiver() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        receiver = session.createReceiver(queue);
        connection.start();
    }

    // ...
}

在这个类中,我们首先创建 QueueConnectionFactory 来设置到 IBM MQ 服务器的连接。*然后使用此连接创建 QueueSession,这允许我们与队列交互。*

最后,我们定义特定队列"QUEUE1"并创建 QueueReceiver 来处理来自队列的传入消息。

5.2. 读取文本消息

连接、会话和接收器设置完成后,我们就可以开始从队列接收消息了。我们使用 QueueReceiverreceive() 方法从指定队列中拉取消息

public void receiveMessage() {
    try {
        Message message = receiver.receive(1000);
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
        } else {
            // ...
        }
    } catch (JMSException e) {
        // 处理异常
    } finally {
        // 关闭资源
    }
}

receiveMessage() 方法中,我们使用 receive() 函数等待来自队列的消息,超时时间为 1000 毫秒。接收到消息后,我们检查它是否为 TextMessage 类型。

如果是,我们可以使用 getText() 方法检索实际的消息内容,该方法以字符串形式返回文本内容。

6. 消息属性和头信息

本节将讨论一些在发送或接收消息时常用的消息属性和头信息

6.1. 消息属性

消息属性可用于存储和检索消息正文之外的附加信息。这对于过滤消息或向消息添加上下文数据特别有用。 以下是在发送消息时设置自定义属性的方法:

TextMessage message = session.createTextMessage();
message.setText(messageText);

message.setStringProperty("OrderID", "12345");

接下来,我们可以在接收消息时检索该属性:

Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    String orderID = message.getStringProperty("OrderID");
} 

6.2. 消息头信息

消息头信息提供包含消息元数据的预定义字段。 一些常用的消息头信息包括:

  • JMSMessageID:由 JMS 提供者为每条消息分配的唯一标识符。我们可以使用此 ID 来跟踪和记录消息。
  • JMSExpiration:定义消息过期时间(毫秒)。如果消息在此时间内未送达,将被丢弃。
  • JMSTimestamp:消息发送的时间。
  • JMSPriority:消息的优先级。

让我们看看如何在接收消息时检索消息头信息:

Message message = receiver.receive(1000);

if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    String messageId = message.getJMSMessageID();
    long timestamp = message.getJMSTimestamp();
    long expiration = message.getJMSExpiration();
    int priority = message.getJMSPriority();
}

7. 使用 Mockito 进行模拟测试

本节将使用 Mockito 来模拟依赖项并验证 MessageSenderMessageReceiver 类的交互。我们首先使用 @Mock 注解创建依赖项的模拟实例。*

接下来,我们验证 sendMessage() 方法正确地与模拟的 QueueSender 交互。我们模拟 QueueSendersend() 方法,并验证 TextMessage 是否正确创建:

String messageText = "Hello Baeldung! Nice to meet you!";
doNothing().when(sender).send(any(TextMessage.class));

messageSender.sendMessage(messageText);

verify(sender).send(any(TextMessage.class));
verify(textMessage).setText(messageText);

最后,我们验证 receiveMessage() 方法正确地与模拟的 QueueReceiver 交互。我们模拟 receive() 方法以返回预定义的 TextMessage,并按预期检索消息文本:

when(receiver.receive(anyLong())).thenReturn(textMessage);
when(textMessage.getText()).thenReturn("Hello Baeldung! Nice to meet you!");

messageReceiver.receiveMessage();
verify(textMessage).getText();

8. 总结

本文探讨了设置 JMS 连接、会话和消息生产者/接收器以与 IBM MQ 队列交互的过程。我们还介绍了 IBM MQ 支持的几种消息类型。此外,我们强调了自定义属性和头信息如何增强消息处理。


原始标题:Read and Write to IBM MQ Queue Using Java JMS | Baeldung