2. Observable 与 Flowable 的核心区别

RxJava 2 对框架进行了彻底重构,其中最关键的改进之一就是明确区分了背压感知和非背压感知的数据源

  • Observable不支持背压处理,适用于无法控制生产速度的场景(如 UI 事件)
  • Flowable专门处理背压问题,适用于高速数据流场景(如数据库查询、网络请求)

⚠️ 使用场景选择

  • 当数据量可控且消费速度稳定时,用 Observable 更简单
  • 当数据量巨大(如 10,000+ 元素)或消费速度不确定时,必须用 Flowable

冷热数据流差异

  • 冷 Observable:按需发射数据,天然安全(如 range()
  • 热 Observable:持续发射数据,可能淹没消费者(如 interval()

3. 创建 Flowable 的三种方式

3.1 基础创建方式

Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);

⚠️ 实际开发中很少用静态数据创建,主要用于测试场景

3.2 从 Observable 转换

Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
  .toFlowable(BackpressureStrategy.BUFFER);

✅ 关键点:必须指定 BackpressureStrategy 策略

3.3 通过 FlowableOnSubscribe 创建

FlowableOnSubscribe<Integer> flowableOnSubscribe
 = flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
  .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

✅ 优势:所有订阅者接收相同事件,天然支持背压

4. 背压策略(BackpressureStrategy)全解析

4.1 BUFFER 策略

public void thenAllValuesAreBufferedAndReceived() {
    List testList = IntStream.range(0, 100000)
      .boxed()
      .collect(Collectors.toList());
 
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.BUFFER)
      .observeOn(Schedulers.computation()).test();

    testSubscriber.awaitTerminalEvent();

    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertEquals(testList, receivedInts);
}

行为:缓冲所有事件直到消费者处理完毕
⚠️ 风险:可能引发 OOM(内存溢出)

4.2 DROP 策略

public void whenDropStrategyUsed_thenOnBackpressureDropped() {
   
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.DROP)
      .observeOn(Schedulers.computation())
      .test();
    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(!receivedInts.contains(100000));
 }

行为:直接丢弃消费者无法处理的事件
适用:允许数据丢失的场景(如实时位置更新)

4.3 LATEST 策略

public void whenLatestStrategyUsed_thenTheLastElementReceived() {
  
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.LATEST)
      .observeOn(Schedulers.computation())
      .test();

    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(receivedInts.contains(100000));
 }

行为:只保留最新事件,覆盖旧值
⚠️ 对比 DROP:DROP 丢弃新事件,LATEST 丢弃旧事件

4.4 ERROR 策略

public void whenErrorStrategyUsed_thenExceptionIsThrown() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.ERROR)
      .observeOn(Schedulers.computation())
      .test();

    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

行为:当背压发生时直接抛出 MissingBackpressureException
适用:明确要求背压必须处理的场景

4.5 MISSING 策略

public void whenMissingStrategyUsed_thenException() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.MISSING)
      .observeOn(Schedulers.computation())
      .test();
    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

⚠️ 行为:不处理背压,由下游自行处理
适用:需要后续通过操作符动态控制背压的场景

5. 总结

  • Flowable 是 RxJava 2 的核心改进,专门解决背压问题
  • 策略选择关键
    • 需要完整数据 → BUFFER
    • 允许数据丢失 → DROP/LATEST
    • 强制要求处理 → ERROR
    • 动态控制 → MISSING
  • ⚠️ 踩坑提醒BUFFER 策略可能导致 OOM,务必评估数据量级

🔍 深入探索:更多 API 细节可查阅 官方文档
💻 完整示例GitHub 代码库


原始标题:RxJava 2 - Flowable