1. 引言

Java 9 引入了 Flow API,作为 Reactive Streams 规范 的官方实现。

本文将先带你理解 Reactive Streams 的核心思想,再对比它与 RxJava 和 Java 9 Flow API 的关系。重点不是讲概念,而是通过实际代码看清两者的定位差异,帮你避免在项目中“踩坑”。

2. 什么是 Reactive Streams?

Reactive Streams 规范 的目标是为异步流处理提供标准,核心是支持 非阻塞背压(backpressure)。所谓背压,就是消费者可以主动控制数据流速,防止生产者太快导致内存溢出。

规范定义了四个核心接口,构成响应式流的基石:

  • Publisher<T>:数据发布者,按需向订阅者推送数据
  • Subscriber<T>:数据消费者,订阅后接收数据
  • Subscription:连接发布者和订阅者的“通道”,用于请求数据或取消
  • Processor<T,R>:既是订阅者又是发布者,常用于中间转换(如过滤、映射)

⚠️ 注意:这些接口在 org.reactivestreams 包中,是规范定义,不是 JDK 原生。Java 9 的 Flow API 是对它的官方实现。

RxJava 虽然早于该规范,但从 2.0 开始已完全兼容 Reactive Streams。而 Flow API 是 JDK 原生支持,无需引入第三方依赖。

3. 使用场景

我们以一个直播视频服务为例。

直播和点播不同:服务器以固定速率推送视频帧,播放器必须自己适应节奏。如果处理不过来,就得通过背压机制通知服务器“慢点发”。

模型很简单:

  • 发布者:VideoStreamServer,持续生成视频帧
  • 订阅者:VideoPlayer,接收并播放帧

先定义数据实体 VideoFrame

public class VideoFrame {
    private long number;
    // 其他字段如编码、时间戳等

    public VideoFrame(long number) {
        this.number = number;
    }

    public long getNumber() {
        return number;
    }

    // getter/setter 省略
}

接下来分别用 Flow API 和 RxJava 实现。

4. 使用 Flow API 实现

Java 9 的 Flow API 位于 java.util.concurrent.Flow,是 Reactive Streams 规范的官方实现。它的特点是极简,只提供基础接口,适合做标准桥梁。

4.1 发布者:VideoStreamServer

我们直接继承 SubmissionPublisher,它是 JDK 提供的线程安全发布者实现,自带缓冲和背压管理。

public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
  
    public VideoStreamServer() {
        // 单线程推送,缓冲区最大5个帧
        super(Executors.newSingleThreadExecutor(), 5);
    }
}

SubmissionPublisher 内部使用 BufferedSubscription 管理订阅,自动处理背压。当缓冲区满时,会根据策略拒绝新数据。

4.2 订阅者:VideoPlayer

实现 Flow.Subscriber<VideoFrame> 接口:

public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
   
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // 先要1帧
    }

    @Override
    public void onNext(VideoFrame item) {
        log.info("播放帧 #{}", item.getNumber());
        subscription.request(1); // 处理完再要1帧
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("视频流异常: {}", throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log.info("视频播放结束");
    }
}

⚠️ 注意:必须在 onSubscribe 中调用 request(),否则收不到任何数据。这是背压的核心机制。

4.3 组装运行

VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());

// 模拟每1ms生成一帧
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();

executor.scheduleWithFixedDelay(() -> {
    boolean dropped = streamServer.offer(
        new VideoFrame(frameNumber.getAndIncrement()),
        (subscriber, videoFrame) -> {
            subscriber.onError(new RuntimeException("帧#" + videoFrame.getNumber() + "因背压被丢弃"));
            return true;
        }
    );
}, 0, 1, TimeUnit.MILLISECONDS);

// 运行1秒后停止
Thread.sleep(1000);
executor.shutdown();

offer() 是非阻塞提交,如果缓冲区满,会触发回调丢弃帧并报错。

5. 使用 RxJava 实现

RxJava 是 ReactiveX 在 Java 的实现,融合了观察者模式 + 迭代器模式 + 函数式编程。它比 Flow API 提供了更丰富的操作符和更流畅的 API。

当前主流版本是 RxJava 3.x。从 2.x 开始,Flowable 类完全兼容 Reactive Streams 规范。

5.1 生成视频流

方式一:基于 Stream 创建(适合有限流)

Stream<VideoFrame> videoStream = Stream.iterate(
    new VideoFrame(0),
    vf -> new VideoFrame(vf.getNumber() + 1)
);

方式二:使用 Flowable.create(更灵活,推荐)

Flowable<VideoFrame> videoFlowable = Flowable.create(
    (FlowableEmitter<VideoFrame> emitter) -> {
        AtomicLong frame = new AtomicLong();
        while (!emitter.isCancelled()) {
            emitter.onNext(new VideoFrame(frame.incrementAndGet()));
            Thread.sleep(1); // 模拟生成延迟
        }
    },
    BackpressureStrategy.BUFFER // 背压策略
);

5.2 订阅与播放

videoFlowable
    .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // 生产线程
    .onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR) // 缓冲5帧,满则报错
    .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // 消费线程
    .subscribe(
        item -> {
            log.info("播放帧 #{}", item.getNumber());
            Thread.sleep(30); // 模拟播放延迟
        },
        error -> log.error("播放异常", error)
    );

✅ RxJava 的链式调用非常清晰,背压策略也一目了然。

6. RxJava 与 Flow API 对比

特性 Flow API RxJava
定位 JDK 标准接口,轻量 功能完整的响应式框架
API 丰富度 ❌ 极简,需手动实现 ✅ 丰富操作符(map、flatMap、combineLatest 等)
背压支持 ✅ 基础支持 ✅ 多种策略(DROP、ERROR、BUFFER 等)
线程调度 ❌ 无内置支持 subscribeOn / observeOn
错误处理 ✅ 基础 onError onErrorResumeNextretry
组合操作 ❌ 需自定义 Processor zipcombineLatestmerge

举个实际例子:

假设播放器需要先解码才能播放。用 RxJava 只需一个 map

videoFlowable
    .map(this::decodeFrame) // 解码
    .subscribe(player::play);

而 Flow API 需要实现一个 Processor,代码量翻倍,还容易出错。

再比如合并音视频流:

  • RxJava:Flowable.combineLatest(video, audio, VideoWithAudio::new)
  • Flow API:手动实现多订阅合并,复杂且易出 bug。

7. 为什么要有 Flow API?

你可能会问:既然 RxJava 这么强大,为啥还要 Flow API?

答案是:标准化

Flow API 的存在意义是:

  • ✅ 作为 JDK 原生的 Reactive Streams 接口,避免对第三方库的依赖
  • ✅ 为其他 Java 库(如 HTTP 客户端)提供标准响应式支持
  • ✅ 实现跨库互操作(interoperability)

例如:

  • java.net.http.HttpClient 的响应体就是 Flow.Publisher
  • Reactor、RxJava 都提供了与 Flow API 的互转工具类(如 FlowAdapters

🔧 简单粗暴地说:Flow API 是“标准”,RxJava 是“工具”。标准不需要功能多,但必须稳定、通用。

8. 总结

  • Reactive Streams 是规范,定义了 Publisher/Subscriber 等接口
  • Flow API 是 JDK 对该规范的实现,轻量、标准,适合做桥梁
  • RxJava 是功能强大的响应式框架,适合业务开发,API 丰富
  • ⚠️ 日常开发推荐使用 RxJava 或 Project Reactor
  • ✅ Flow API 更适合底层库、中间件或需要与 JDK 原生组件集成的场景

本文完整代码见 GitHub 仓库(mock 地址)


原始标题:The Difference between RxJava API and the Java 9 Flow API