1. 概述

本文将探讨RxJava库如何帮助我们处理背压问题。

简单来说,RxJava通过引入Observable(被观察者)和Observer(观察者)的概念实现响应式流。处理可能无限的数据流极具挑战性,因为我们必须面对背压问题。

Observable发射数据的速度超过Observer的消费速度时,很容易出现问题。我们将探讨解决未消费数据缓冲区增长的不同方案。

2. 热Observable与冷Observable

首先创建一个简单的消费者函数,用于后续Observable的数据消费:

public class ComputeFunction {
    public static void compute(Integer v) {
        try {
            System.out.println("compute integer v: " + v);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

compute()函数仅打印参数。关键点在于Thread.sleep(1000)的调用——我们用它模拟耗时操作,这将导致Observable的数据发射速度远超Observer的消费速度。

Observable分为热(Hot)和冷(Cold)两种类型,它们在背压处理上完全不同。

2.1. 冷Observable

Observable按特定序列发射数据,但只有在Observer准备好时才开始发射,且按Observer要求的速率进行,不会破坏序列完整性。Observable采用懒加载方式提供数据。

Observer仅在准备好处理时才获取元素,由于采用拉取模式,Observable无需缓冲数据。例如,基于静态范围(1到100万)创建的Observable,无论观察频率如何,都会发射相同序列:

Observable.range(1, 1_000_000)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute);

程序启动后,Observer会懒加载地计算元素,通过拉取模式请求数据。Schedulers.computation()表示在RxJava的计算线程池中运行Observer

输出将逐个显示compute()方法处理的结果:

compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...

Observable无需处理背压,因为采用拉取模式。典型场景包括数据库查询、文件读取或网络请求。

2.2. 热Observable

Observable创建后立即开始生成并发射数据,与冷Observable的拉取模式相反。Observable按自身速率发射数据,观察者必须跟上这个节奏。

Observer无法按Observable的生产速度消费数据时,必须进行缓冲或其他处理,否则会填满内存,最终导致OutOfMemoryException

考虑热Observable向消费者发射100万个元素的例子。当Observercompute()方法处理耗时较长时,Observable会因数据堆积导致程序崩溃:

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

IntStream.range(1, 1_000_000).forEach(source::onNext);

运行该程序会抛出MissingBackpressureException,因为我们未定义处理过速Observable的方案。典型场景包括鼠标/键盘事件、系统事件或股票价格。

3. 缓冲过速Observable

处理过速Observable的第一种方式是为未消费元素定义缓冲区。可通过调用buffer()方法实现:

PublishSubject<Integer> source = PublishSubject.<Integer>create();
        
source.buffer(1024)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

定义1024大小的缓冲区可为Observer追赶过速数据源提供时间。缓冲区会存储未处理的元素。

虽然可增大缓冲区容量,但这通常只是临时方案,若数据源超出预测容量仍会导致溢出。

4. 批量处理发射元素

可将过速元素按N个一组进行分批处理。当Observable发射速度超过Observer处理速度时,通过分组发送批量数据来缓解压力:

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.window(500)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

使用window(500)方法会指示Observable将元素分组为500个一批。当Observer处理批量数据的速度优于逐个处理时,此技术可减轻过速问题。

5. 跳过元素

Observable的部分值可安全忽略,可采用采样或节流操作符。sample()throttleFirst()方法接收时间参数:

  • sample():定期检查序列,发射指定时间段内的最后一个元素
  • throttleFirst():发射指定时间段后的第一个元素

时间参数决定了从序列中选取元素的间隔。可通过跳过元素定义背压处理策略:

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.sample(100, TimeUnit.MILLISECONDS)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

指定采样策略为100毫秒间隔。该时间段内的元素将被发射给Observer

⚠️ 注意:这些操作符仅降低下游Observer的接收速率,仍可能导致MissingBackpressureException

6. 处理缓冲区溢出

当采样或批处理策略无法阻止缓冲区填满时,需实现溢出处理方案。需使用onBackpressureBuffer()方法防止BufferOverflowException

该方法接收三个参数:缓冲区容量、缓冲区填满时的回调函数、丢弃元素的策略(位于BackpressureOverflow类)。溢出处理策略有四种:

  • ON_OVERFLOW_ERROR:默认行为,缓冲区满时抛出BufferOverflowException
  • ON_OVERFLOW_DEFAULT:当前等同于ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST:溢出时忽略当前值,仅保留旧值
  • ON_OVERFLOW_DROP_OLDEST:丢弃最旧元素,添加当前值

示例策略配置:

Observable.range(1, 1_000_000)
  .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
  .observeOn(Schedulers.computation())
  .subscribe(e -> {}, Throwable::printStackTrace);

此配置在缓冲区溢出时丢弃最旧元素,添加Observable的最新值。

✅ 后两种策略会导致数据流不连续(因丢弃元素),但不会抛出BufferOverflowException

7. 丢弃所有过速元素

当下游Observer未准备好接收元素时,可使用onBackpressureDrop()方法直接丢弃该元素。可将其视为容量为零的onBackpressureBuffer(),并采用ON_OVERFLOW_DROP_LATEST策略。

当可安全忽略源数据(如鼠标移动或GPS位置信号)时,此操作符特别有用——因为后续会有更新数据:

Observable.range(1, 1_000_000)
  .onBackpressureDrop()
  .observeOn(Schedulers.computation())
  .doOnNext(ComputeFunction::compute)
  .subscribe(v -> {}, Throwable::printStackTrace);

onBackpressureDrop()能解决过速问题,但需谨慎使用。

8. 总结

本文探讨了过速Observable问题及背压处理方案。当Observer无法跟上Observable的数据生产速度时,我们分析了缓冲、批处理和跳过元素等策略。

所有示例代码片段可在GitHub项目中找到——这是一个Maven项目,可直接导入运行。


原始标题:Dealing with Backpressure with RxJava