1. 概述

本文将深入探讨 RxJava 中处理 Observable 的实用工具操作符,并展示如何实现自定义操作符。

操作符本质是一个函数:它接收上游 Observable 并修改其行为,最终返回下游 ObservableSubscriber(类型 T 和 R 可以相同也可以不同)。操作符通过拦截订阅来增强现有 Observable,虽然听起来复杂,但实际非常灵活且容易掌握。

2. Do 系列操作符

这些操作符用于在 Observable 生命周期事件中插入自定义行为。

2.1 核心操作符

  • doOnNext:在 onNext 调用时执行指定动作
  • doOnCompleted:当 Observable 正常终止(调用 onCompleted)时执行动作
Observable.range(1, 10)
  .doOnNext(r -> receivedTotal += r)
  .doOnCompleted(() -> result = "Completed")
  .subscribe();
 
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));

2.2 扩展操作符

  • ⚠️ doOnEach:为每个发射项注册回调,拦截 onNext/onError/onCompleted
  • ⚠️ doOnSubscribe:当观察者订阅时执行动作
  • ⚠️ doOnUnsubscribe:取消订阅时执行动作(与 doOnSubscribe 相反)
Observable.range(1, 10)
  .doOnEach(new Observer<Integer>() {
      @Override
      public void onCompleted() {
          System.out.println("Complete");
      }
      @Override
      public void onError(Throwable e) {
          e.printStackTrace();
      }
      @Override
      public void onNext(Integer value) {
          receivedTotal += value;
      }
  })
  .doOnSubscribe(() -> result = "Subscribed")
  .subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));

2.3 错误处理操作符

  • doOnError:当 Observable 因错误终止时执行动作
  • doOnTerminate:无论成功或失败,终止时都执行动作
  • doAfterTerminate:终止后执行动作(替代已废弃的 finallyDo
thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
  .single()
  .doOnError(throwable -> { throw new RuntimeException("error");})
  .doOnTerminate(() -> result += "doOnTerminate")
  .doAfterTerminate(() -> result += "_doAfterTerminate")
  .subscribe();
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));

3. 线程控制:ObserveOn vs SubscribeOn

默认情况下Observable 及其操作符链在调用 subscribe 的线程上运行。

3.1 ObserveOn 操作符

指定 Observable 用于发送通知的 Scheduler

Observable.range(1, 5)
  .map(i -> i * 100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .observeOn(Schedulers.computation())
  .map(i -> i * 10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

关键点

  • 元素在 main 线程产生并传递到第一个 map
  • observeOn 后续处理切换到 computation 线程
  • ⚠️ 踩坑提示:下游处理速度可能超过上游,需注意背压问题

3.2 SubscribeOn 操作符

指定 Observable 自身运行的 Scheduler

Observable.range(1, 5)
  .map(i -> i * 100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .subscribeOn(Schedulers.computation())
  .map(i -> i * 10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

核心区别: | 操作符 | 作用范围 | 使用数量 | 线程切换 | |--------|----------|----------|----------| | subscribeOn | 影响整个链 | 仅一个 | 指定源发射线程 | | observeOn | 影响下游 | 多个 | 可多次切换 |

4. Single 和 SingleOrDefault

4.1 Single 操作符

返回仅发射单个项的 Observable,源必须严格发射一个元素

Observable.range(1, 1)
  .single()
  .subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);

当源发射零个或多个元素时抛出异常:

Observable.empty()
  .single()
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

4.2 SingleOrDefault 操作符

类似 single,但允许指定默认值:

Observable.empty()
  .singleOrDefault("Default")
  .subscribe(i -> result +=i);
assertTrue(result.equals("Default"));

注意:当源发射多个元素时仍抛出异常:

Observable.range(1, 3)
  .singleOrDefault(5)
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

4.3 使用场景总结

  • ✅ 源可能发射零或一个元素 → 用 singleOrDefault
  • ✅ 需处理多个元素 → 用 firstlast

5. Timestamp 操作符

简单粗暴:为每个发射项附加时间戳:

Observable.range(1, 10)
  .timestamp()
  .map(o -> result = o.getClass().toString() )
  .last()
  .subscribe();
 
assertTrue(result.equals("class rx.schedulers.Timestamped"));

6. Delay 操作符

延迟发射:在发射每个元素前暂停指定时间:

Observable source = Observable.interval(1, TimeUnit.SECONDS)
  .take(5)
  .timestamp();

Observable delayedObservable
  = source.delay(2, TimeUnit.SECONDS);

source.subscribe(
  value -> System.out.println("source :" + value),
  t -> System.out.println("source error"),
  () -> System.out.println("source completed"));

delayedObservable.subscribe(
  value -> System.out.println("delay : " + value),
  t -> System.out.println("delay error"),
  () -> System.out.println("delay completed"));
Thread.sleep(8000);

替代方案delaySubscription 延迟订阅而非发射,默认使用 computation 调度器,可自定义。

7. Repeat 操作符

拦截完成通知:收到上游完成信号后重新订阅,形成循环:

Observable.range(1, 3)
  .repeat(3)
  .subscribe(i -> receivedTotal += i);
 
assertTrue(receivedTotal == 18);

⚠️ 注意:当上游是动态流时,重复的事件序列可能不同。

8. Cache 操作符

智能缓存机制

  1. 首次订阅时,委托给底层 Observable 并缓存所有通知
  2. 后续订阅直接使用缓存值,不再调用底层源
Observable<Integer> source =
  Observable.<Integer>create(subscriber -> {
      System.out.println("Create");
      subscriber.onNext(receivedTotal += 5);
      subscriber.onCompleted();
  }).cache();
source.subscribe(i -> {
  System.out.println("element 1");
  receivedTotal += 1;
});
source.subscribe(i -> {
  System.out.println("element 2");
  receivedTotal += 2;
});
 
assertTrue(receivedTotal == 8);

9. Using 操作符

资源管理利器:自动管理资源生命周期:

  1. 订阅时创建资源
  2. 使用资源创建 Observable
  3. 取消订阅或终止时释放资源
Observable<Character> values = Observable.using(
  () -> "resource",
  r -> {
      return Observable.create(o -> {
          for (Character c : r.toCharArray()) {
              o.onNext(c);
          }
          o.onCompleted();
      });
  },
  r -> System.out.println("Disposed: " + r)
);
values.subscribe(
  v -> result += v,
  e -> result += e
);
assertTrue(result.equals("resource"));

10. 总结

RxJava 的真正威力在于其操作符系统。通过声明式转换数据流,我们获得了:

  • ✅ 安全性
  • ✅ 表达力
  • ✅ 灵活性

掌握内置操作符是使用 RxJava 的关键。函数式编程基础决定了操作符在 RxJava 生态中的核心地位。

本文所有示例代码可在 GitHub 获取完整项目源码。


原始标题:Observable Utility Operators in RxJava

« 上一篇: Jukito 简介