1. 概述

本文介绍 LMAX Disruptor 框架,探讨它如何实现低延迟的软件并发。我们还将通过一个基础示例展示 Disruptor 库的用法。

2. 什么是 Disruptor?

Disruptor 是 LMAX 开源的高性能 Java 并发编程框架,专为处理海量事务场景设计。它能在低延迟下运行,且避免了传统并发代码的复杂性。其性能优化核心在于:充分利用硬件特性的软件设计。

2.1. 机械同理心

核心概念 机械同理心 要求开发者理解底层硬件工作原理,并编写与之匹配的代码。

以 CPU 和内存组织为例:CPU 与主内存之间存在多层缓存(L1/L2/L3)。执行操作时,CPU 依次从 L1 → L2 → L3 → 主内存查找数据,路径越长延迟越高。

如果同一数据被频繁访问(如循环计数器),应将其加载到离 CPU 最近的位置。缓存缺失的典型延迟如下:

访问目标 CPU 周期数 时间
主内存 多个周期 ~60-80 ns
L3 缓存 ~40-45 周期 ~15 ns
L2 缓存 ~10 周期 ~3 ns
L1 缓存 ~3-4 周期 ~1 ns
寄存器 1 周期 极快

2.2. 为什么不用队列?

传统队列实现存在三大问题:

写竞争:队列的 head/tail/size 变量常引发写冲突
⚠️ 锁开销:为解决竞争引入锁,导致内核态切换,清空 CPU 缓存
伪共享:即使写入不同变量,缓存行(Cache Line)失效导致线程间互相干扰

队列的"单写原则"被破坏:多个生产者写入不同位置时,缓存行同步引发性能雪崩。生产者和消费者速度不匹配时,队列常处于"满"或"空"状态,极少平衡运行。

2.3. Disruptor 如何工作

环形缓冲区结构及 API

Disruptor 核心是基于数组的环形缓冲区(Ring Buffer),关键特性:

  • 预分配事件对象,避免 GC 压力
  • 生产者/消费者通过序列号(sequence)协调读写位置
  • 无锁设计:每个线程独立操作自己的序列号,通过读取他人序列号判断可用槽位
  • 多播机制:事件广播给所有消费者,支持并行处理
  • 依赖图:协调消费者间的处理顺序

3. 使用 Disruptor 库

3.1. Maven 依赖

pom.xml 添加依赖:

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

最新版本可查看 Maven 中央仓库

3.2. 定义事件

创建事件载体类:

public static class ValueEvent {
    private int value;
    public final static EventFactory EVENT_FACTORY 
      = () -> new ValueEvent();

    // 标准 getter/setter
}

EventFactory 让 Disruptor 预分配事件对象,避免运行时创建开销。

3.3. 消费者

消费者从环形缓冲区读取数据。定义事件处理器:

public class SingleEventPrintConsumer {
    ...

    public EventHandler<ValueEvent>[] getEventHandler() {
        EventHandler<ValueEvent> eventHandler 
          = (event, sequence, endOfBatch) 
            -> print(event.getValue(), sequence);
        return new EventHandler[] { eventHandler };
    }
 
    private void print(int id, long sequenceId) {
        logger.info("Id is " + id 
          + " sequence id that was used is " + sequenceId);
    }
}

本例中消费者仅打印日志,实际场景可替换为业务逻辑。

3.4. 构建 Disruptor

初始化 Disruptor 实例:

ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;

WaitStrategy waitStrategy = new BusySpinWaitStrategy();
Disruptor<ValueEvent> disruptor 
  = new Disruptor<>(
    ValueEvent.EVENT_FACTORY, 
    16, 
    threadFactory, 
    ProducerType.SINGLE, 
    waitStrategy);

构造参数说明:

参数 作用
Event Factory 预生成环形缓冲区中的事件对象
Ring Buffer 大小 必须是 2 的幂(本例 16),便于位运算优化
Thread Factory 为事件处理器创建线程
Producer Type 生产者模式(单生产者 SINGLE / 多生产者 MULTI
Wait Strategy 消费者等待策略(本例 BusySpinWaitStrategy 适用于低延迟场景)

绑定消费者处理器:

disruptor.handleEventsWith(getEventHandler());

支持链式调用添加多个消费者,构建处理流水线。

3.5. 启动 Disruptor

启动 Disruptor 并获取环形缓冲区引用:

RingBuffer<ValueEvent> ringBuffer = disruptor.start();

3.6. 生产与发布事件

生产者通过序列号写入数据:

for (int eventCount = 0; eventCount < 32; eventCount++) {
    long sequenceId = ringBuffer.next();
    ValueEvent valueEvent = ringBuffer.get(sequenceId);
    valueEvent.setValue(eventCount);
    ringBuffer.publish(sequenceId);
}

关键流程:

  1. next() 获取下一个可用序列号
  2. get(sequenceId) 获取事件对象
  3. 设置事件值
  4. publish(sequenceId) 发布事件(类似两阶段提交)

⚠️ 注意:序列号必须连续递增,否则会破坏数据一致性。

4. 总结

本文介绍了 Disruptor 的核心设计理念:

  • 通过机械同理心优化硬件利用率
  • 环形缓冲区替代传统队列解决伪共享问题
  • 无锁设计实现低延迟并发

示例代码可在 GitHub 项目 获取(Maven 项目,可直接导入运行)。对于追求极致性能的系统,Disruptor 是值得考虑的利器。


原始标题:Concurrency with LMAX Disruptor - An Introduction

« 上一篇: Guava Multimap使用
» 下一篇: Java周报,165