1. 概述

JGroups 是一个用于可靠消息交换的 Java API。它提供简洁的接口,具备以下核心特性:

  • 灵活的协议栈(支持 TCP 和 UDP)
  • 大消息的分片与重组
  • 可靠的单播和多播
  • 故障检测
  • 流量控制

以及许多其他功能。

本教程将创建一个简单应用,实现节点间的 String 消息交换,并在新节点加入网络时同步共享状态。

2. 环境搭建

2.1 Maven 依赖

只需在 pom.xml 中添加单个依赖:

<dependency>
    <groupId>org.jgroups</groupId>
    <artifactId>jgroups</artifactId>
    <version>4.0.10.Final</version>
</dependency>

最新版本可在 Maven Central 查询。

2.2 网络配置

JGroups 默认使用 IPv6。根据系统配置,这可能导致节点间通信失败。

为避免此问题,启动应用时需设置 JVM 参数:

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger

3. JChannel 详解

JGroups 网络的连接入口是 JChannel。它负责加入集群、收发消息以及维护网络状态信息。

3.1 创建 Channel

通过配置文件路径创建 JChannel。若省略文件名,默认查找当前目录下的 udp.xml

JChannel channel = new JChannel("src/main/resources/udp.xml");

⚠️ JGroups 配置可能很复杂,但默认的 UDP/TCP 配置已满足多数需求。本教程使用内置的 UDP 配置文件。

更多传输协议配置参考:JGroups 手册

3.2 连接集群

创建 Channel 后需加入集群:集群是交换消息的节点集合

加入集群需指定集群名称:

channel.connect("Baeldung");

首个加入的节点会自动创建集群(若不存在)。

3.3 命名节点

节点通过名称标识,便于对等节点发送定向消息和接收成员变更通知。可手动设置名称:

channel.name("user1");

后续将用此名称跟踪集群成员的加入/退出。

3.4 关闭 Channel

及时关闭 Channel 至关重要,否则对等节点无法及时感知节点退出。

通过 close() 方法关闭:

channel.close()

4. 集群视图变更

创建 JChannel 后,即可监控集群节点状态并交换消息。

JGroups 通过 View 类维护集群状态。每个 Channel 持有唯一的网络视图。视图变更时通过 viewAccepted() 回调通知。

推荐继承 ReceiverAdaptor 实现回调(空实现所有接口方法):

public void viewAccepted(View newView) {

    private View lastView;

    if (lastView == null) {
        System.out.println("Received initial view:");
        newView.forEach(System.out::println);
    } else {
        System.out.println("Received new view.");

        List<Address> newMembers = View.newMembers(lastView, newView);
        System.out.println("New members: ");
        newMembers.forEach(System.out::println);

        List<Address> exMembers = View.leftMembers(lastView, newView);
        System.out.println("Exited members:");
        exMembers.forEach(System.out::println);
    }
    lastView = newView;
}

每个 View 包含集群成员的 Address 列表。JGroups 提供视图比较工具方法,用于检测新增/退出成员。

5. 发送消息

JGroups 消息处理简单直接。Message 包含字节数组及发送方/接收方的 Address

本教程使用命令行输入的 String,实际应用可扩展为其他数据类型。

5.1 广播消息

创建 Message 需指定目标和字节数组;JChannel 自动设置发送方。目标为 null 时消息广播至整个集群

System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);

运行多个实例并发送消息(需实现下文的 receive()),所有节点(包括发送方)都会收到。

5.2 屏蔽自身消息

若不想接收自己发送的消息,设置属性:

channel.setDiscardOwnMessages(true);

再次测试,发送方将不再收到广播消息。

5.3 定向消息

发送定向消息需有效的 Address。若通过名称引用节点,需通过 View 查找地址:

private Optional<Address> getAddress(String name) { 
    View view = channel.view(); 
    return view.getMembers().stream()
      .filter(address -> name.equals(address.toString()))
      .findAny(); 
}

Address 名称通过 toString() 获取,只需在成员列表中搜索目标名称。

从控制台接收名称并发送定向消息:

Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
  .orElseThrow(() -> new Exception("Destination not found"); 
Message message = new Message(destination, "Hi there!"); 
channel.send(message);

6. 接收消息

重写 ReceiverAdaptorreceive() 方法:

public void receive(Message message) {
    String line = "Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);
}

已知消息为 String 类型,可直接调用 getObject() 输出。

7. 状态同步

新节点加入网络时可能需要获取集群状态。JGroups 提供状态传输机制。

节点加入集群后调用 getState(),状态通常从最老成员(协调者)获取。

为应用添加广播消息计数器:

private Integer messageCount = 0;

public void receive(Message message) {
    String line = "Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);

    if (message.getDest() == null) {
        messageCount++;
        System.out.println("Message count: " + messageCount);
    }
}

✅ 检查 null 目标:若统计定向消息,各节点计数将不一致。

重写 ReceiverAdaptor 的两个方法:

public void setState(InputStream input) {
    try {
        messageCount = Util.objectFromStream(new DataInputStream(input));
    } catch (Exception e) {
        System.out.println("Error deserialing state!");
    }
    System.out.println(messageCount + " is the current messagecount.");
}

public void getState(OutputStream output) throws Exception {
    Util.objectToStream(messageCount, new DataOutputStream(output));
}

与消息类似,JGroups 以字节数组传输状态。协调者通过 InputStream 写入状态,新节点通过 OutputStream 读取。API 提供序列化/反序列化工具类。

⚠️ 生产环境中状态访问需保证线程安全。

最后在启动代码中加入状态获取:

channel.connect(clusterName);
channel.getState(null, 0);

getState() 参数:

  • 目标地址:null 表示协调者
  • 超时时间(毫秒):0 表示永不超时

运行测试:

  1. 启动两个节点并交换广播消息,观察计数器递增
  2. 新增第三个节点或重启现有节点,新节点将打印正确的消息计数

8. 总结

本教程使用 JGroups 实现了消息交换应用,通过 API 监控节点加入/退出,并在新节点加入时同步集群状态。

完整代码示例见 GitHub 仓库


原始标题:Reliable Messaging with JGroups