1. 简介

本文将介绍 JeroMQ——一个纯 Java 实现的 ZeroMQ 库。 我们将探讨它的核心功能以及如何在应用中使用它解决实际问题。

2. 什么是 ZeroMQ?

ZeroMQ 是一个轻量级消息中间件,无需搭建独立的消息代理服务。 与 ActiveMQ 或 Kafka 不同,它不需要额外的 Broker 进程。所有消息功能都通过嵌入应用的 ZeroMQ 库直接实现。

✅ 支持标准消息模式:

  • 请求/响应(Request/Response)
  • 发布/订阅(Publish/Subscribe)
  • 同步/异步通信
  • 其他高级模式

2.1. Socket 机制

ZeroMQ 的核心是 Socket 概念,类似于传统网络编程中的 Socket。 每个 Socket 都有特定类型,通过监听(bind)或连接(connect)建立通信链路。⚠️ 注意:只有特定组合的 Socket 才能协同工作,取决于通信需求。

JeroMQ 支持多种传输协议:

  • tcp://<host>:<port>
    基于标准 TCP/IP,支持跨主机通信,但需处理网络可靠性问题
  • ipc://<endpoint>
    进程间通信,仅限同一主机,受系统权限限制
  • inproc://<name>
    同进程内通信,必须在同一 JeroMQ 上下文中使用

选择传输协议时需权衡场景需求。不同协议和 Socket 类型组合可与其他语言的 ZeroMQ 实现互操作。

3. 快速上手

JeroMQ 作为 ZeroMQ 的纯 Java 实现,集成非常简单粗暴。

3.1. 依赖配置

添加 Maven 依赖:

<dependency>
    <groupId>org.zeromq</groupId>
    <artifactId>jeromq</artifactId>
    <version>0.5.3</version>
</dependency>

最新版本可在 Maven 中央仓库 查询。

3.2. 上下文管理

所有操作前必须初始化 ZContext: 它是 JeroMQ 的核心资源管理器。

// 创建上下文
ZContext context = new ZContext();

// 使用后必须关闭
context.close();

⚠️ 踩坑提示:上下文生命周期必须覆盖所有 Socket 操作!常见实践:

  • 标准Java应用:使用 try-with-resources
  • Spring应用:配置为带 destroy-method 的 Bean

3.3. Socket 创建

通过上下文创建 Socket: 这是所有消息通信的基础。

// 创建指定类型的 Socket
ZMQ.Socket socket = context.createSocket(SocketType.REP);

// 监听连接(服务端)
socket.bind("tcp://*:5555");

// 或发起连接(客户端)
socket.connect("tcp://localhost:5555");

// 发送/接收消息
socket.send("data".getBytes());
byte[] reply = socket.recv();

❌ 线程安全警告:Socket 非线程安全!多线程访问需加锁或使用线程池隔离。

4. 请求/响应模式

最基础的 REQ/REP 模式实现:

服务端代码:

try (ZContext context = new ZContext()) {
    ZMQ.Socket socket = context.createSocket(SocketType.REP);
    socket.bind("tcp://*:5555");

    while (!Thread.currentThread().isInterrupted()) {
        byte[] request = socket.recv();
        // 处理请求
        
        String response = "world";
        socket.send(response.getBytes(ZMQ.CHARSET));
    }
}

客户端代码:

try (ZContext context = new ZContext()) {
    ZMQ.Socket socket = context.createSocket(SocketType.REQ);
    socket.connect("tcp://localhost:5555");

    socket.send("Hello".getBytes(ZMQ.CHARSET));
    byte[] reply = socket.recv();
}

✅ 关键特性:

  • REQ 端主动发起请求
  • REP 端必须收到请求后才能响应
  • 严格的消息顺序保证

4.1. 多客户端支持

JeroMQ 天然支持多客户端连接: 无需修改服务端代码,自动处理并发连接。

⚠️ 性能瓶颈:由于响应必须与请求严格对应,服务端处理必须串行执行。高并发场景会成为性能瓶颈。

4.2. 异步处理方案

使用 ROUTER Socket 实现异步响应: 允许乱序回复。

服务端改造:

try (ZContext context = new ZContext()) {
    ZMQ.Socket router = context.createSocket(SocketType.ROUTER);
    router.bind("tcp://*:5555");

    while (true) {
        // 接收三帧数据:客户端ID + 分隔符 + 消息
        String clientId = router.recvStr();
        router.recv(); // 丢弃分隔符
        String message = router.recvStr();
        
        // 异步处理(示例:线程池)
        executor.submit(() -> {
            // 处理业务逻辑
            
            // 发送响应:客户端ID + 分隔符 + 响应数据
            router.sendMore(clientId);
            router.sendMore("");
            router.send("Response");
        });
    }
}

客户端改造:

try (ZContext context = new ZContext()) {
    ZMQ.Socket req = context.createSocket(SocketType.REQ);
    // 设置唯一标识(关键!)
    req.setIdentity("client-123".getBytes(ZMQ.CHARSET));
    
    req.connect("tcp://localhost:5555");
    req.send("Request".getBytes(ZMQ.CHARSET));
    
    byte[] reply = req.recv();
}

✅ 核心优势:

  • 服务端可并发处理请求
  • 响应可乱序返回
  • 需显式管理客户端标识

5. 发布/订阅模式

实现广播式消息分发:

发布端代码:

try (ZContext context = new ZContext()) {
    ZMQ.Socket pub = context.createSocket(SocketType.PUB);
    pub.bind("tcp://*:5555");

    while (true) {
        // 发布消息
        pub.send("News update");
        Thread.sleep(1000);
    }
}

订阅端代码:

try (ZContext context = new ZContext()) {
    ZMQ.Socket sub = context.createSocket(SocketType.SUB);
    sub.connect("tcp://localhost:5555");
    
    // 订阅所有消息(空前缀)
    sub.subscribe("".getBytes());
    
    while (true) {
        String msg = sub.recvStr();
        System.out.println("Received: " + msg);
    }
}

⚠️ 重要特性:

  • 订阅前发布的消息会丢失
  • 支持前缀过滤(如 sub.subscribe("sport".getBytes())

5.1. 多订阅者支持

天然支持多订阅者: 每个订阅者都会收到所有匹配的消息,类似 JMS Topic 的广播机制。

✅ 灵活订阅:

  • 不同订阅者可设置不同过滤前缀
  • 服务端无需感知订阅者存在

5.2. 非阻塞接收

避免 recv() 阻塞线程: 使用 ZMQ.DONTWAIT 标志实现轮询。

try (ZContext context = new ZContext()) {
    ZMQ.Socket sub = context.createSocket(SocketType.SUB);
    sub.connect("tcp://*:5555");
    sub.subscribe("".getBytes());

    while (!Thread.currentThread().isInterrupted()) {
        // 非阻塞接收
        String msg = sub.recvStr(ZMQ.DONTWAIT);
        if (msg != null) {
            System.out.println("Got: " + msg);
        } else {
            // 执行其他任务
            doOtherWork();
            Thread.sleep(100);
        }
    }
}

✅ 适用场景:

  • 需同时监听多个 Socket
  • 消息处理间隔执行其他任务

6. 总结

JeroMQ 提供了轻量级、高性能的消息通信能力,特别适合:

  • 微服务间通信
  • 高并发消息处理
  • 分布式系统事件总线

相比传统 MQ,它无需部署 Broker,但需要开发者自行管理连接和消息路由。对于熟悉 Socket 编程的开发者来说,这是一个简单粗暴的解决方案。

完整示例代码请访问 GitHub 仓库


原始标题:Introduction to JeroMQ