1. 概述
JGroups 是一个用于可靠消息交换的 Java API。它提供简洁的接口,具备以下核心特性:
- 灵活的协议栈(支持 TCP 和 UDP)
- 大消息的分片与重组
- 可靠的单播和多播
- 故障检测
- 流量控制
以及许多其他功能。
本教程将创建一个简单应用,实现节点间的 String
消息交换,并在新节点加入网络时同步共享状态。
2. 环境搭建
2.1 Maven 依赖
只需在 pom.xml
中添加单个依赖:
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>4.0.10.Final</version>
</dependency>
最新版本可在 Maven Central 查询。
2.2 网络配置
JGroups 默认使用 IPv6。根据系统配置,这可能导致节点间通信失败。
为避免此问题,启动应用时需设置 JVM 参数:
java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger
3. JChannel 详解
JGroups 网络的连接入口是 JChannel
。它负责加入集群、收发消息以及维护网络状态信息。
3.1 创建 Channel
通过配置文件路径创建 JChannel
。若省略文件名,默认查找当前目录下的 udp.xml
:
JChannel channel = new JChannel("src/main/resources/udp.xml");
⚠️ JGroups 配置可能很复杂,但默认的 UDP/TCP 配置已满足多数需求。本教程使用内置的 UDP 配置文件。
更多传输协议配置参考:JGroups 手册
3.2 连接集群
创建 Channel 后需加入集群:集群是交换消息的节点集合。
加入集群需指定集群名称:
channel.connect("Baeldung");
首个加入的节点会自动创建集群(若不存在)。
3.3 命名节点
节点通过名称标识,便于对等节点发送定向消息和接收成员变更通知。可手动设置名称:
channel.name("user1");
后续将用此名称跟踪集群成员的加入/退出。
3.4 关闭 Channel
及时关闭 Channel 至关重要,否则对等节点无法及时感知节点退出。
通过 close()
方法关闭:
channel.close()
4. 集群视图变更
创建 JChannel
后,即可监控集群节点状态并交换消息。
JGroups 通过 View
类维护集群状态。每个 Channel 持有唯一的网络视图。视图变更时通过 viewAccepted()
回调通知。
推荐继承 ReceiverAdaptor
实现回调(空实现所有接口方法):
public void viewAccepted(View newView) {
private View lastView;
if (lastView == null) {
System.out.println("Received initial view:");
newView.forEach(System.out::println);
} else {
System.out.println("Received new view.");
List<Address> newMembers = View.newMembers(lastView, newView);
System.out.println("New members: ");
newMembers.forEach(System.out::println);
List<Address> exMembers = View.leftMembers(lastView, newView);
System.out.println("Exited members:");
exMembers.forEach(System.out::println);
}
lastView = newView;
}
每个 View
包含集群成员的 Address
列表。JGroups 提供视图比较工具方法,用于检测新增/退出成员。
5. 发送消息
JGroups 消息处理简单直接。Message
包含字节数组及发送方/接收方的 Address
。
本教程使用命令行输入的 String
,实际应用可扩展为其他数据类型。
5.1 广播消息
创建 Message
需指定目标和字节数组;JChannel
自动设置发送方。目标为 null
时消息广播至整个集群:
System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);
运行多个实例并发送消息(需实现下文的 receive()
),所有节点(包括发送方)都会收到。
5.2 屏蔽自身消息
若不想接收自己发送的消息,设置属性:
channel.setDiscardOwnMessages(true);
再次测试,发送方将不再收到广播消息。
5.3 定向消息
发送定向消息需有效的 Address
。若通过名称引用节点,需通过 View
查找地址:
private Optional<Address> getAddress(String name) {
View view = channel.view();
return view.getMembers().stream()
.filter(address -> name.equals(address.toString()))
.findAny();
}
Address
名称通过 toString()
获取,只需在成员列表中搜索目标名称。
从控制台接收名称并发送定向消息:
Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
.orElseThrow(() -> new Exception("Destination not found");
Message message = new Message(destination, "Hi there!");
channel.send(message);
6. 接收消息
重写 ReceiverAdaptor
的 receive()
方法:
public void receive(Message message) {
String line = "Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);
}
已知消息为 String
类型,可直接调用 getObject()
输出。
7. 状态同步
新节点加入网络时可能需要获取集群状态。JGroups 提供状态传输机制。
节点加入集群后调用 getState()
,状态通常从最老成员(协调者)获取。
为应用添加广播消息计数器:
private Integer messageCount = 0;
public void receive(Message message) {
String line = "Message received from: "
+ message.getSrc()
+ " to: " + message.getDest()
+ " -> " + message.getObject();
System.out.println(line);
if (message.getDest() == null) {
messageCount++;
System.out.println("Message count: " + messageCount);
}
}
✅ 检查 null
目标:若统计定向消息,各节点计数将不一致。
重写 ReceiverAdaptor
的两个方法:
public void setState(InputStream input) {
try {
messageCount = Util.objectFromStream(new DataInputStream(input));
} catch (Exception e) {
System.out.println("Error deserialing state!");
}
System.out.println(messageCount + " is the current messagecount.");
}
public void getState(OutputStream output) throws Exception {
Util.objectToStream(messageCount, new DataOutputStream(output));
}
与消息类似,JGroups 以字节数组传输状态。协调者通过 InputStream
写入状态,新节点通过 OutputStream
读取。API 提供序列化/反序列化工具类。
⚠️ 生产环境中状态访问需保证线程安全。
最后在启动代码中加入状态获取:
channel.connect(clusterName);
channel.getState(null, 0);
getState()
参数:
- 目标地址:
null
表示协调者 - 超时时间(毫秒):0 表示永不超时
运行测试:
- 启动两个节点并交换广播消息,观察计数器递增
- 新增第三个节点或重启现有节点,新节点将打印正确的消息计数
8. 总结
本教程使用 JGroups 实现了消息交换应用,通过 API 监控节点加入/退出,并在新节点加入时同步集群状态。
完整代码示例见 GitHub 仓库。