1. 概述

本文将深入探讨 RxJava 中各类调度器(Schedulers)的使用场景,重点关注基于 subscribeOnobserveOn 实现多线程编程时的核心差异。

调度器让我们能精确控制 Observable 链中任务的执行位置和时机。通过 Schedulers 工厂类,我们可以获取不同特性的调度器实例。

2. 默认线程行为

⚠️ 默认情况下 RxJava 是单线程的 - Observable 及其操作符链会在调用 subscribe() 的同一线程上通知观察者。

observeOnsubscribeOn 方法接收一个 Scheduler 参数,用于调度任务执行。调度器通过 createWorker() 方法创建 Scheduler.Worker,该工作者在单线程上按顺序执行任务。

2.1. 任务调度

通过创建 Worker 并调度任务,我们可以指定执行线程:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
 
Assert.assertTrue(result.equals("action"));

任务会被提交到 Worker 绑定的线程队列中执行。

2.2. 任务取消

Scheduler.Worker 继承了 Subscription,调用 unsubscribe() 会清空队列并取消所有待执行任务:

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += "First_Action";
    worker.unsubscribe(); // 取消后续任务
});
worker.schedule(() -> result += "Second_Action"); // 不会执行
 
Assert.assertTrue(result.equals("First_Action"));

✅ 正在执行的任务会被中断,后续任务直接丢弃。

3. Schedulers.newThread

该调度器每次调用都会创建全新线程,性能开销大且线程无法复用:

Observable.just("Hello")
  .observeOn(Schedulers.newThread())
  .doOnNext(s -> result2 += Thread.currentThread().getName())
  .observeOn(Schedulers.newThread())
  .subscribe(s -> result1 += Thread.currentThread().getName());
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

❌ 仅适用于粗粒度任务(耗时极长但数量稀少)的场景:

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_"); // 嵌套调度
    result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals("RxNewThreadScheduler-1_Start_End_worker_"));

Worker 绑定到特定线程,任务执行完成后线程即终止。

4. Schedulers.immediate

immediate 调度器在当前线程同步执行任务,阻塞调用线程:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_"); // 立即执行
    result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals("main_Start_worker__End"));

等同于不指定调度器的默认行为:

Observable.just("Hello")
  .subscribeOn(Schedulers.immediate())
  .subscribe(s -> result += Thread.currentThread().getName());
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));

5. Schedulers.trampoline

trampoline 在当前线程执行任务,但会等待已有任务完成再执行新任务:

Observable.just(2, 4, 6, 8)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579")); // 按订阅顺序执行

immediate 的核心区别:

  • immediate: 立即执行当前任务
  • trampoline: 队列化执行,先到先得
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "Start";
    worker.schedule(() -> {
        result += "_middleStart";
        worker.schedule(() -> result += "_worker_"); // 最后执行
        result += "_middleEnd";
    });
    result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result.equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers.from

Executor 包装为调度器,桥接 Java 并发工具:

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
      .setNameFormat(pattern)
      .build();
}

@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements() 
 throws InterruptedException {
    ExecutorService poolA = newFixedThreadPool(
      10, threadFactory("Sched-A-%d"));
    Scheduler schedulerA = Schedulers.from(poolA);
    ExecutorService poolB = newFixedThreadPool(
      10, threadFactory("Sched-B-%d"));
    Scheduler schedulerB = Schedulers.from(poolB);

    Observable<String> observable = Observable.create(subscriber -> {
      subscriber.onNext("Alfa");
      subscriber.onNext("Beta");
      subscriber.onCompleted();
    });;

    observable
      .subscribeOn(schedulerA) // 只有第一个生效
      .subscribeOn(schedulerB) // 会被忽略
      .subscribe(
        x -> result += Thread.currentThread().getName() + x + "_",
        Throwable::printStackTrace,
        () -> result += "_Completed"
      );
    Thread.sleep(2000);
    Assert.assertTrue(result.equals("Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}

⚠️ 多次调用 subscribeOn 只有第一次生效,后续调用会产生额外开销。

7. Schedulers.io

类似 newThread复用线程,采用无界线程池:

Observable.just("io")
  .subscribeOn(Schedulers.io())
  .subscribe(i -> result += Thread.currentThread().getName());
 
Assert.assertTrue(result.equals("RxIoScheduler-2"));

⚠️ 踩坑警告:处理慢速 I/O 操作(如网络请求)时可能创建过多线程,导致应用卡死。

✅ 实践中通常优先选择 io() 调度器处理 I/O 密集型任务。

8. Schedulers.computation

默认线程数 = CPU 核心数(Runtime.getRuntime().availableProcessors()),专为CPU 密集型任务设计:

Observable.just("computation")
  .subscribeOn(Schedulers.computation())
  .subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));

核心特性:

  • 使用无界队列
  • 线程数受系统属性 rx.scheduler.max-computation-threads 控制
  • 确保不会创建超过 CPU 核心数的线程

适合纯计算任务(无阻塞操作),避免线程饥饿。

9. Schedulers.test

测试专用调度器,支持手动推进时间模拟:

List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<Long> tick = Observable
  .interval(1, TimeUnit.SECONDS, scheduler);

Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string)
  .subscribeOn(scheduler)
  .subscribe(subscriber);

subscriber.assertNoValues(); // 初始无输出
subscriber.assertNotCompleted();

scheduler.advanceTimeBy(1, TimeUnit.SECONDS); // 推进1秒
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

scheduler.advanceTimeTo(3, TimeUnit.SECONDS); // 推进到3秒
subscriber.assertCompleted();
assertThat(
  subscriber.getOnNextEvents(), 
  hasItems("0-A", "1-B", "2-C"));

完美解决异步测试中的时间控制问题。

10. 默认调度器

部分操作符允许指定调度器,未指定时使用默认值:

ExecutorService poolA = newFixedThreadPool(
  10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
  .delay(1, TimeUnit.SECONDS, schedulerA) // 指定调度器
  .subscribe(i -> result+= Thread.currentThread().getName() + i + " ");

Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

常见支持自定义调度器的操作符: | 操作符 | 默认调度器 | |--------------|------------------| | delay | computation() | | buffer | computation() | | interval | computation() | | timer | computation() | | timeout | computation() |

❌ 未指定时默认使用 computation(),在 I/O 密集型场景可能成为瓶颈。

11. 总结

✅ 真正的响应式应用中,异步操作只需少量线程和调度器。

掌握调度器是编写可扩展、安全 RxJava 代码的关键:

  • 高负载场景下 subscribeOnobserveOn 的差异至关重要
  • 确保下游调度器能处理上游产生的负载(参考背压机制

所有示例代码可在 GitHub 项目 获取,Maven 工程可直接导入运行。


原始标题:Schedulers in RxJava

» 下一篇: Java Weekly, 第196期