2. 依赖项

在使用 Aeron 前,我们需要将 最新版本(本文编写时为 1.44.1)添加到构建文件中。

如果使用 Maven,在 pom.xml 中添加:

<dependency>
    <groupId>io.aeron</groupId>
    <artifactId>aeron-all</artifactId>
    <version>1.44.1</version>
</dependency>

如果使用 Gradle,在 build.gradle 中添加:

implementation("io.aeron:aeron-all:1.44.1")

现在就可以在应用中使用 Aeron 了。

⚠️ 注意:目前 Aeron 的部分功能在 Java 16+ 上无法直接使用,这是由于 JPMS 模块系统阻止了特定反射操作

3. 媒体驱动器

Aeron 通过媒体驱动器(Media Driver)实现应用与传输层之间的解耦。媒体驱动器作为应用与传输媒介的桥梁,所有 Aeron 进程都通过它进行交互(本地或远程)。交互通过文件系统实现,需确保所有应用指向同一磁盘目录。⚠️ 同一目录同时只能运行一个媒体驱动器,否则会启动失败。

简单场景下可直接在应用中内嵌启动:

MediaDriver mediaDriver = MediaDriver.launch();

这将使用默认配置启动驱动器(包括默认目录)。另一种内嵌方式是:

MediaDriver mediaDriver = MediaDriver.launchEmbedded();

此方法会自动生成随机目录,避免多实例冲突。两种方式都可通过 MediaDriver.Context 进一步配置:

MediaDriver.Context context = new MediaDriver.Context();
context.threadingMode(ThreadingMode.SHARED);

MediaDriver mediaDriver = MediaDriver.launch(context);

使用完毕后需关闭驱动器(实现 AutoCloseable,推荐 try-with-resources)。

也可作为外部进程运行,使用依赖中的 JAR 文件:

$ java -cp aeron-all-1.44.1.jar io.aeron.driver.MediaDriver

效果与 MediaDriver.launch() 完全相同。

4. Aeron API 客户端

所有 Aeron API 操作都通过 Aeron 类完成。创建实例时需指定媒体驱动器目录:

Aeron aeron = Aeron.connect();

默认指向标准目录(等同于 *MediaDriver.launch()*)。如需自定义目录(如使用 *launchEmbedded()*),需提供 Aeron.Context

Aeron.Context ctx = new Aeron.Context();
ctx.aeronDirectoryName(mediaDriver.aeronDirectoryName());

Aeron aeron = Aeron.connect(ctx);

⚠️ 若目录无运行中的驱动器,Aeron.connect() 会阻塞直到驱动器启动。

同一媒体驱动器可连接多个 Aeron 客户端(通常来自不同应用,但同应用也可)。每次连接需使用新的 Aeron.Context

Aeron.Context ctx1 = new Aeron.Context();
ctx1.aeronDirectoryName(mediaDriver.aeronDirectoryName());
aeron1 = Aeron.connect(ctx1);
System.out.println("Aeron 1 connected: " + aeron1);

Aeron.Context ctx2 = new Aeron.Context();
ctx2.aeronDirectoryName(mediaDriver.aeronDirectoryName());
aeron2 = Aeron.connect(ctx2);
System.out.println("Aeron 2 connected: " + aeron2);

Aeron 实例同样实现 AutoCloseable,推荐使用 try-with-resources 管理。

5. 消息发送与接收

5.1. 缓冲区

Aeron 使用 DirectBuffer 表示所有消息(发送/接收)。本质是字节数组,但提供了标准数据类型的操作方法。

发送消息时需用 UnsafeBuffer 构造缓冲区(底层使用 sun.misc.Unsafe)。创建时需字节数组或 ByteBuffer,推荐使用 BufferUtil.allocateDirectAligned() 优化:

UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));

通过 getXyz()putXyz() 方法操作数据:

// 从索引 0 写入字符串
int length = buffer.putStringWithoutLengthUtf8(0, message); 

// 从指定偏移量读取指定长度的字符串
String message = buffer.getStringWithoutLengthUtf8(offset, length); 

需手动管理缓冲区偏移量:写入时返回数据长度用于计算下一偏移量;读取时需预知数据长度。

5.2. 通道与流

Aeron 通过特定通道上的标识流传输数据

  • 通道:URI 格式字符串,指定传输方式(如 UDP)
  • :简单数字标识符,通信双方需使用相同 ID

最简单的通道是 aeron:ipc(通过驱动器共享内存传输),仅限同驱动器内的进程通信。

更常用的是 aeron:udp 通道,支持跨网络通信:

UDP 通道示意图

UDP 通道需指定主机和端口:

  • 接收方:监听地址
  • 发送方:目标地址

示例:aeron:udp?endpoint=localhost:20121 表示在 localhost:20121 上收发 UDP 消息。

5.3. 订阅

通过订阅(Subscription)接收消息。订阅创建后,驱动器即准备接收数据:

Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:20121", 1001);

使用完毕需关闭(AutoCloseable,推荐 try-with-resources)。

消息接收采用轮询机制,提供 FragmentHandler 处理消息:

FragmentHandler fragmentHandler = (buffer, offset, length, header) -> {
    String data = buffer.getStringWithoutLengthUtf8(offset, length);

    System.out.printf("来自会话 %d 的消息 (%d字节@偏移量%d) <<%s>>%n",
            header.sessionId(), length, offset, data);
};

轮询消息时调用 *Subscription.poll()*:

int fragmentsRead = subscription.poll(fragmentHandler, 10);

参数:

  • fragmentHandler:消息处理器
  • 10:单次轮询最大处理片段数

⚠️ 即使有多个可用消息,每次轮询最多处理一个消息。无消息时立即返回;超大消息可能只接收部分片段。

5.4. 发布

通过发布(Publication)发送消息。需等待订阅方就绪:

ConcurrentPublication publication = aeron.addPublication("aeron:udp?endpoint=localhost:20121", 1001);
while (!publication.isConnected()) {
    TimeUnit.MILLISECONDS.sleep(100);
}

❌ 若无连接的订阅方,发送会立即失败(非阻塞)。使用完毕需关闭(AutoCloseable)。

连接成功后发送消息

UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
buffer.putStringWithoutLengthUtf8(0, message);

long result = publication.offer(buffer, 0, message.length());

返回值:

  • 成功:返回发送字节数(可能小于请求长度)
  • 失败:返回负数错误码:
    • Publication.NOT_CONNECTED:未连接订阅方
    • Publication.BACK_PRESSURED:订阅方背压,暂无法发送
    • Publication.ADMIN_ACTION:管理操作(如日志轮转)导致失败,通常可立即重试
    • Publication.CLOSEDPublication 已关闭
    • Publication.MAX_POSITION_EXCEEDED:驱动器缓冲区满,需关闭并重建 Publication

6. 总结

我们快速了解了 Aeron 的核心概念、配置方法及消息收发流程。该库功能远不止于此,建议动手实践探索更多可能性。

所有示例代码可在 GitHub 获取。


原始标题:UDP Messaging Using Aeron | Baeldung