2. 依赖项
在使用 Aeron 前,我们需要将 最新版本(本文编写时为 1.44.1)添加到构建文件中。
如果使用 Maven,在 pom.xml 中添加:
<dependency>
<groupId>io.aeron</groupId>
<artifactId>aeron-all</artifactId>
<version>1.44.1</version>
</dependency>
如果使用 Gradle,在 build.gradle 中添加:
implementation("io.aeron:aeron-all:1.44.1")
现在就可以在应用中使用 Aeron 了。
⚠️ 注意:目前 Aeron 的部分功能在 Java 16+ 上无法直接使用,这是由于 JPMS 模块系统阻止了特定反射操作。
3. 媒体驱动器
Aeron 通过媒体驱动器(Media Driver)实现应用与传输层之间的解耦。媒体驱动器作为应用与传输媒介的桥梁,所有 Aeron 进程都通过它进行交互(本地或远程)。交互通过文件系统实现,需确保所有应用指向同一磁盘目录。⚠️ 同一目录同时只能运行一个媒体驱动器,否则会启动失败。
简单场景下可直接在应用中内嵌启动:
MediaDriver mediaDriver = MediaDriver.launch();
这将使用默认配置启动驱动器(包括默认目录)。另一种内嵌方式是:
MediaDriver mediaDriver = MediaDriver.launchEmbedded();
此方法会自动生成随机目录,避免多实例冲突。两种方式都可通过 MediaDriver.Context 进一步配置:
MediaDriver.Context context = new MediaDriver.Context();
context.threadingMode(ThreadingMode.SHARED);
MediaDriver mediaDriver = MediaDriver.launch(context);
使用完毕后需关闭驱动器(实现 AutoCloseable,推荐 try-with-resources)。
也可作为外部进程运行,使用依赖中的 JAR 文件:
$ java -cp aeron-all-1.44.1.jar io.aeron.driver.MediaDriver
效果与 MediaDriver.launch() 完全相同。
4. Aeron API 客户端
所有 Aeron API 操作都通过 Aeron 类完成。创建实例时需指定媒体驱动器目录:
Aeron aeron = Aeron.connect();
默认指向标准目录(等同于 *MediaDriver.launch()*)。如需自定义目录(如使用 *launchEmbedded()*),需提供 Aeron.Context:
Aeron.Context ctx = new Aeron.Context();
ctx.aeronDirectoryName(mediaDriver.aeronDirectoryName());
Aeron aeron = Aeron.connect(ctx);
⚠️ 若目录无运行中的驱动器,Aeron.connect() 会阻塞直到驱动器启动。
同一媒体驱动器可连接多个 Aeron 客户端(通常来自不同应用,但同应用也可)。每次连接需使用新的 Aeron.Context:
Aeron.Context ctx1 = new Aeron.Context();
ctx1.aeronDirectoryName(mediaDriver.aeronDirectoryName());
aeron1 = Aeron.connect(ctx1);
System.out.println("Aeron 1 connected: " + aeron1);
Aeron.Context ctx2 = new Aeron.Context();
ctx2.aeronDirectoryName(mediaDriver.aeronDirectoryName());
aeron2 = Aeron.connect(ctx2);
System.out.println("Aeron 2 connected: " + aeron2);
Aeron 实例同样实现 AutoCloseable,推荐使用 try-with-resources 管理。
5. 消息发送与接收
5.1. 缓冲区
Aeron 使用 DirectBuffer 表示所有消息(发送/接收)。本质是字节数组,但提供了标准数据类型的操作方法。
发送消息时需用 UnsafeBuffer 构造缓冲区(底层使用 sun.misc.Unsafe)。创建时需字节数组或 ByteBuffer,推荐使用 BufferUtil.allocateDirectAligned() 优化:
UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
通过 getXyz() 和 putXyz() 方法操作数据:
// 从索引 0 写入字符串
int length = buffer.putStringWithoutLengthUtf8(0, message);
// 从指定偏移量读取指定长度的字符串
String message = buffer.getStringWithoutLengthUtf8(offset, length);
✅ 需手动管理缓冲区偏移量:写入时返回数据长度用于计算下一偏移量;读取时需预知数据长度。
5.2. 通道与流
Aeron 通过特定通道上的标识流传输数据。
- 通道:URI 格式字符串,指定传输方式(如 UDP)
- 流:简单数字标识符,通信双方需使用相同 ID
最简单的通道是 aeron:ipc(通过驱动器共享内存传输),仅限同驱动器内的进程通信。
更常用的是 aeron:udp 通道,支持跨网络通信:
UDP 通道需指定主机和端口:
- 接收方:监听地址
- 发送方:目标地址
示例:aeron:udp?endpoint=localhost:20121
表示在 localhost:20121
上收发 UDP 消息。
5.3. 订阅
通过订阅(Subscription)接收消息。订阅创建后,驱动器即准备接收数据:
Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:20121", 1001);
使用完毕需关闭(AutoCloseable,推荐 try-with-resources)。
消息接收采用轮询机制,提供 FragmentHandler 处理消息:
FragmentHandler fragmentHandler = (buffer, offset, length, header) -> {
String data = buffer.getStringWithoutLengthUtf8(offset, length);
System.out.printf("来自会话 %d 的消息 (%d字节@偏移量%d) <<%s>>%n",
header.sessionId(), length, offset, data);
};
轮询消息时调用 *Subscription.poll()*:
int fragmentsRead = subscription.poll(fragmentHandler, 10);
参数:
- fragmentHandler:消息处理器
- 10:单次轮询最大处理片段数
⚠️ 即使有多个可用消息,每次轮询最多处理一个消息。无消息时立即返回;超大消息可能只接收部分片段。
5.4. 发布
通过发布(Publication)发送消息。需等待订阅方就绪:
ConcurrentPublication publication = aeron.addPublication("aeron:udp?endpoint=localhost:20121", 1001);
while (!publication.isConnected()) {
TimeUnit.MILLISECONDS.sleep(100);
}
❌ 若无连接的订阅方,发送会立即失败(非阻塞)。使用完毕需关闭(AutoCloseable)。
连接成功后发送消息:
UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
buffer.putStringWithoutLengthUtf8(0, message);
long result = publication.offer(buffer, 0, message.length());
返回值:
- ✅ 成功:返回发送字节数(可能小于请求长度)
- ❌ 失败:返回负数错误码:
- Publication.NOT_CONNECTED:未连接订阅方
- Publication.BACK_PRESSURED:订阅方背压,暂无法发送
- Publication.ADMIN_ACTION:管理操作(如日志轮转)导致失败,通常可立即重试
- Publication.CLOSED:Publication 已关闭
- Publication.MAX_POSITION_EXCEEDED:驱动器缓冲区满,需关闭并重建 Publication
6. 总结
我们快速了解了 Aeron 的核心概念、配置方法及消息收发流程。该库功能远不止于此,建议动手实践探索更多可能性。
所有示例代码可在 GitHub 获取。