1. 引言
Java 9 引入了 Flow API
,作为 Reactive Streams 规范 的官方实现。
本文将先带你理解 Reactive Streams 的核心思想,再对比它与 RxJava 和 Java 9 Flow API 的关系。重点不是讲概念,而是通过实际代码看清两者的定位差异,帮你避免在项目中“踩坑”。
2. 什么是 Reactive Streams?
Reactive Streams 规范 的目标是为异步流处理提供标准,核心是支持 非阻塞背压(backpressure)。所谓背压,就是消费者可以主动控制数据流速,防止生产者太快导致内存溢出。
规范定义了四个核心接口,构成响应式流的基石:
- ✅
Publisher<T>
:数据发布者,按需向订阅者推送数据 - ✅
Subscriber<T>
:数据消费者,订阅后接收数据 - ✅
Subscription
:连接发布者和订阅者的“通道”,用于请求数据或取消 - ✅
Processor<T,R>
:既是订阅者又是发布者,常用于中间转换(如过滤、映射)
⚠️ 注意:这些接口在
org.reactivestreams
包中,是规范定义,不是 JDK 原生。Java 9 的Flow API
是对它的官方实现。
RxJava 虽然早于该规范,但从 2.0 开始已完全兼容 Reactive Streams。而 Flow API
是 JDK 原生支持,无需引入第三方依赖。
3. 使用场景
我们以一个直播视频服务为例。
直播和点播不同:服务器以固定速率推送视频帧,播放器必须自己适应节奏。如果处理不过来,就得通过背压机制通知服务器“慢点发”。
模型很简单:
- 发布者:
VideoStreamServer
,持续生成视频帧 - 订阅者:
VideoPlayer
,接收并播放帧
先定义数据实体 VideoFrame
:
public class VideoFrame {
private long number;
// 其他字段如编码、时间戳等
public VideoFrame(long number) {
this.number = number;
}
public long getNumber() {
return number;
}
// getter/setter 省略
}
接下来分别用 Flow API 和 RxJava 实现。
4. 使用 Flow API 实现
Java 9 的 Flow API
位于 java.util.concurrent.Flow
,是 Reactive Streams 规范的官方实现。它的特点是极简,只提供基础接口,适合做标准桥梁。
4.1 发布者:VideoStreamServer
我们直接继承 SubmissionPublisher
,它是 JDK 提供的线程安全发布者实现,自带缓冲和背压管理。
public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
public VideoStreamServer() {
// 单线程推送,缓冲区最大5个帧
super(Executors.newSingleThreadExecutor(), 5);
}
}
✅ SubmissionPublisher
内部使用 BufferedSubscription
管理订阅,自动处理背压。当缓冲区满时,会根据策略拒绝新数据。
4.2 订阅者:VideoPlayer
实现 Flow.Subscriber<VideoFrame>
接口:
public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 先要1帧
}
@Override
public void onNext(VideoFrame item) {
log.info("播放帧 #{}", item.getNumber());
subscription.request(1); // 处理完再要1帧
}
@Override
public void onError(Throwable throwable) {
log.error("视频流异常: {}", throwable.getMessage());
}
@Override
public void onComplete() {
log.info("视频播放结束");
}
}
⚠️ 注意:必须在 onSubscribe
中调用 request()
,否则收不到任何数据。这是背压的核心机制。
4.3 组装运行
VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());
// 模拟每1ms生成一帧
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
boolean dropped = streamServer.offer(
new VideoFrame(frameNumber.getAndIncrement()),
(subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("帧#" + videoFrame.getNumber() + "因背压被丢弃"));
return true;
}
);
}, 0, 1, TimeUnit.MILLISECONDS);
// 运行1秒后停止
Thread.sleep(1000);
executor.shutdown();
✅ offer()
是非阻塞提交,如果缓冲区满,会触发回调丢弃帧并报错。
5. 使用 RxJava 实现
RxJava 是 ReactiveX 在 Java 的实现,融合了观察者模式 + 迭代器模式 + 函数式编程。它比 Flow API 提供了更丰富的操作符和更流畅的 API。
当前主流版本是 RxJava 3.x。从 2.x 开始,Flowable
类完全兼容 Reactive Streams 规范。
5.1 生成视频流
方式一:基于 Stream
创建(适合有限流)
Stream<VideoFrame> videoStream = Stream.iterate(
new VideoFrame(0),
vf -> new VideoFrame(vf.getNumber() + 1)
);
方式二:使用 Flowable.create
(更灵活,推荐)
Flowable<VideoFrame> videoFlowable = Flowable.create(
(FlowableEmitter<VideoFrame> emitter) -> {
AtomicLong frame = new AtomicLong();
while (!emitter.isCancelled()) {
emitter.onNext(new VideoFrame(frame.incrementAndGet()));
Thread.sleep(1); // 模拟生成延迟
}
},
BackpressureStrategy.BUFFER // 背压策略
);
5.2 订阅与播放
videoFlowable
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // 生产线程
.onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR) // 缓冲5帧,满则报错
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // 消费线程
.subscribe(
item -> {
log.info("播放帧 #{}", item.getNumber());
Thread.sleep(30); // 模拟播放延迟
},
error -> log.error("播放异常", error)
);
✅ RxJava 的链式调用非常清晰,背压策略也一目了然。
6. RxJava 与 Flow API 对比
特性 | Flow API | RxJava |
---|---|---|
定位 | JDK 标准接口,轻量 | 功能完整的响应式框架 |
API 丰富度 | ❌ 极简,需手动实现 | ✅ 丰富操作符(map、flatMap、combineLatest 等) |
背压支持 | ✅ 基础支持 | ✅ 多种策略(DROP、ERROR、BUFFER 等) |
线程调度 | ❌ 无内置支持 | ✅ subscribeOn / observeOn |
错误处理 | ✅ 基础 onError | ✅ onErrorResumeNext 、retry 等 |
组合操作 | ❌ 需自定义 Processor | ✅ zip 、combineLatest 、merge |
举个实际例子:
假设播放器需要先解码才能播放。用 RxJava 只需一个 map
:
videoFlowable
.map(this::decodeFrame) // 解码
.subscribe(player::play);
而 Flow API 需要实现一个 Processor
,代码量翻倍,还容易出错。
再比如合并音视频流:
- RxJava:
Flowable.combineLatest(video, audio, VideoWithAudio::new)
- Flow API:手动实现多订阅合并,复杂且易出 bug。
7. 为什么要有 Flow API?
你可能会问:既然 RxJava 这么强大,为啥还要 Flow API?
答案是:标准化。
Flow API 的存在意义是:
- ✅ 作为 JDK 原生的 Reactive Streams 接口,避免对第三方库的依赖
- ✅ 为其他 Java 库(如 HTTP 客户端)提供标准响应式支持
- ✅ 实现跨库互操作(interoperability)
例如:
java.net.http.HttpClient
的响应体就是Flow.Publisher
- Reactor、RxJava 都提供了与 Flow API 的互转工具类(如
FlowAdapters
)
🔧 简单粗暴地说:Flow API 是“标准”,RxJava 是“工具”。标准不需要功能多,但必须稳定、通用。
8. 总结
- ✅ Reactive Streams 是规范,定义了
Publisher
/Subscriber
等接口 - ✅ Flow API 是 JDK 对该规范的实现,轻量、标准,适合做桥梁
- ✅ RxJava 是功能强大的响应式框架,适合业务开发,API 丰富
- ⚠️ 日常开发推荐使用 RxJava 或 Project Reactor
- ✅ Flow API 更适合底层库、中间件或需要与 JDK 原生组件集成的场景
本文完整代码见 GitHub 仓库(mock 地址)