1. 概述
本文将探讨RxJava库如何帮助我们处理背压问题。
简单来说,RxJava通过引入Observable(被观察者)和Observer(观察者)的概念实现响应式流。处理可能无限的数据流极具挑战性,因为我们必须面对背压问题。
当Observable发射数据的速度超过Observer的消费速度时,很容易出现问题。我们将探讨解决未消费数据缓冲区增长的不同方案。
2. 热Observable与冷Observable
首先创建一个简单的消费者函数,用于后续Observable的数据消费:
public class ComputeFunction {
public static void compute(Integer v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
compute()
函数仅打印参数。关键点在于Thread.sleep(1000)
的调用——我们用它模拟耗时操作,这将导致Observable的数据发射速度远超Observer的消费速度。
Observable分为热(Hot)和冷(Cold)两种类型,它们在背压处理上完全不同。
2.1. 冷Observable
冷Observable按特定序列发射数据,但只有在Observer准备好时才开始发射,且按Observer要求的速率进行,不会破坏序列完整性。冷Observable采用懒加载方式提供数据。
Observer仅在准备好处理时才获取元素,由于采用拉取模式,Observable无需缓冲数据。例如,基于静态范围(1到100万)创建的Observable,无论观察频率如何,都会发射相同序列:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute);
程序启动后,Observer会懒加载地计算元素,通过拉取模式请求数据。Schedulers.computation()
表示在RxJava的计算线程池中运行Observer。
输出将逐个显示compute()
方法处理的结果:
compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...
冷Observable无需处理背压,因为采用拉取模式。典型场景包括数据库查询、文件读取或网络请求。
2.2. 热Observable
热Observable创建后立即开始生成并发射数据,与冷Observable的拉取模式相反。热Observable按自身速率发射数据,观察者必须跟上这个节奏。
当Observer无法按Observable的生产速度消费数据时,必须进行缓冲或其他处理,否则会填满内存,最终导致OutOfMemoryException
。
考虑热Observable向消费者发射100万个元素的例子。当Observer的compute()
方法处理耗时较长时,Observable会因数据堆积导致程序崩溃:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
IntStream.range(1, 1_000_000).forEach(source::onNext);
运行该程序会抛出MissingBackpressureException
,因为我们未定义处理过速Observable的方案。典型场景包括鼠标/键盘事件、系统事件或股票价格。
3. 缓冲过速Observable
处理过速Observable的第一种方式是为未消费元素定义缓冲区。可通过调用buffer()
方法实现:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.buffer(1024)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
定义1024大小的缓冲区可为Observer追赶过速数据源提供时间。缓冲区会存储未处理的元素。
虽然可增大缓冲区容量,但这通常只是临时方案,若数据源超出预测容量仍会导致溢出。
4. 批量处理发射元素
可将过速元素按N个一组进行分批处理。当Observable发射速度超过Observer处理速度时,通过分组发送批量数据来缓解压力:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.window(500)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
使用window(500)
方法会指示Observable将元素分组为500个一批。当Observer处理批量数据的速度优于逐个处理时,此技术可减轻过速问题。
5. 跳过元素
若Observable的部分值可安全忽略,可采用采样或节流操作符。sample()
和throttleFirst()
方法接收时间参数:
sample()
:定期检查序列,发射指定时间段内的最后一个元素throttleFirst()
:发射指定时间段后的第一个元素
时间参数决定了从序列中选取元素的间隔。可通过跳过元素定义背压处理策略:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.sample(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
指定采样策略为100毫秒间隔。该时间段内的元素将被发射给Observer。
⚠️ 注意:这些操作符仅降低下游Observer的接收速率,仍可能导致MissingBackpressureException
。
6. 处理缓冲区溢出
当采样或批处理策略无法阻止缓冲区填满时,需实现溢出处理方案。需使用onBackpressureBuffer()
方法防止BufferOverflowException
。
该方法接收三个参数:缓冲区容量、缓冲区填满时的回调函数、丢弃元素的策略(位于BackpressureOverflow
类)。溢出处理策略有四种:
ON_OVERFLOW_ERROR
:默认行为,缓冲区满时抛出BufferOverflowException
ON_OVERFLOW_DEFAULT
:当前等同于ON_OVERFLOW_ERROR
ON_OVERFLOW_DROP_LATEST
:溢出时忽略当前值,仅保留旧值ON_OVERFLOW_DROP_OLDEST
:丢弃最旧元素,添加当前值
示例策略配置:
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> {}, Throwable::printStackTrace);
此配置在缓冲区溢出时丢弃最旧元素,添加Observable的最新值。
✅ 后两种策略会导致数据流不连续(因丢弃元素),但不会抛出BufferOverflowException
。
7. 丢弃所有过速元素
当下游Observer未准备好接收元素时,可使用onBackpressureDrop()
方法直接丢弃该元素。可将其视为容量为零的onBackpressureBuffer()
,并采用ON_OVERFLOW_DROP_LATEST
策略。
当可安全忽略源数据(如鼠标移动或GPS位置信号)时,此操作符特别有用——因为后续会有更新数据:
Observable.range(1, 1_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.doOnNext(ComputeFunction::compute)
.subscribe(v -> {}, Throwable::printStackTrace);
onBackpressureDrop()
能解决过速问题,但需谨慎使用。
8. 总结
本文探讨了过速Observable问题及背压处理方案。当Observer无法跟上Observable的数据生产速度时,我们分析了缓冲、批处理和跳过元素等策略。
所有示例代码片段可在GitHub项目中找到——这是一个Maven项目,可直接导入运行。