1. 概述
本文将深入探讨 RxJava 中各类调度器(Schedulers)的使用场景,重点关注基于 subscribeOn
和 observeOn
实现多线程编程时的核心差异。
调度器让我们能精确控制 Observable 链中任务的执行位置和时机。通过 Schedulers
工厂类,我们可以获取不同特性的调度器实例。
2. 默认线程行为
⚠️ 默认情况下 RxJava 是单线程的 - Observable 及其操作符链会在调用 subscribe()
的同一线程上通知观察者。
observeOn
和 subscribeOn
方法接收一个 Scheduler
参数,用于调度任务执行。调度器通过 createWorker()
方法创建 Scheduler.Worker
,该工作者在单线程上按顺序执行任务。
2.1. 任务调度
通过创建 Worker 并调度任务,我们可以指定执行线程:
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
Assert.assertTrue(result.equals("action"));
任务会被提交到 Worker 绑定的线程队列中执行。
2.2. 任务取消
Scheduler.Worker
继承了 Subscription
,调用 unsubscribe()
会清空队列并取消所有待执行任务:
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += "First_Action";
worker.unsubscribe(); // 取消后续任务
});
worker.schedule(() -> result += "Second_Action"); // 不会执行
Assert.assertTrue(result.equals("First_Action"));
✅ 正在执行的任务会被中断,后续任务直接丢弃。
3. Schedulers.newThread
该调度器每次调用都会创建全新线程,性能开销大且线程无法复用:
Observable.just("Hello")
.observeOn(Schedulers.newThread())
.doOnNext(s -> result2 += Thread.currentThread().getName())
.observeOn(Schedulers.newThread())
.subscribe(s -> result1 += Thread.currentThread().getName());
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));
❌ 仅适用于粗粒度任务(耗时极长但数量稀少)的场景:
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_"); // 嵌套调度
result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals("RxNewThreadScheduler-1_Start_End_worker_"));
Worker 绑定到特定线程,任务执行完成后线程即终止。
4. Schedulers.immediate
immediate
调度器在当前线程同步执行任务,阻塞调用线程:
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "_Start";
worker.schedule(() -> result += "_worker_"); // 立即执行
result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals("main_Start_worker__End"));
等同于不指定调度器的默认行为:
Observable.just("Hello")
.subscribeOn(Schedulers.immediate())
.subscribe(s -> result += Thread.currentThread().getName());
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));
5. Schedulers.trampoline
trampoline
在当前线程执行任务,但会等待已有任务完成再执行新任务:
Observable.just(2, 4, 6, 8)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
.subscribeOn(Schedulers.trampoline())
.subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579")); // 按订阅顺序执行
与 immediate
的核心区别:
immediate
: 立即执行当前任务trampoline
: 队列化执行,先到先得
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
result += Thread.currentThread().getName() + "Start";
worker.schedule(() -> {
result += "_middleStart";
worker.schedule(() -> result += "_worker_"); // 最后执行
result += "_middleEnd";
});
result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result.equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));
6. Schedulers.from
将 Executor
包装为调度器,桥接 Java 并发工具:
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern)
.build();
}
@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements()
throws InterruptedException {
ExecutorService poolA = newFixedThreadPool(
10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(
10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
Observable<String> observable = Observable.create(subscriber -> {
subscriber.onNext("Alfa");
subscriber.onNext("Beta");
subscriber.onCompleted();
});;
observable
.subscribeOn(schedulerA) // 只有第一个生效
.subscribeOn(schedulerB) // 会被忽略
.subscribe(
x -> result += Thread.currentThread().getName() + x + "_",
Throwable::printStackTrace,
() -> result += "_Completed"
);
Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}
⚠️ 多次调用 subscribeOn
只有第一次生效,后续调用会产生额外开销。
7. Schedulers.io
类似 newThread
但复用线程,采用无界线程池:
Observable.just("io")
.subscribeOn(Schedulers.io())
.subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxIoScheduler-2"));
⚠️ 踩坑警告:处理慢速 I/O 操作(如网络请求)时可能创建过多线程,导致应用卡死。
✅ 实践中通常优先选择 io()
调度器处理 I/O 密集型任务。
8. Schedulers.computation
默认线程数 = CPU 核心数(Runtime.getRuntime().availableProcessors()
),专为CPU 密集型任务设计:
Observable.just("computation")
.subscribeOn(Schedulers.computation())
.subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));
核心特性:
- 使用无界队列
- 线程数受系统属性
rx.scheduler.max-computation-threads
控制 - 确保不会创建超过 CPU 核心数的线程
适合纯计算任务(无阻塞操作),避免线程饥饿。
9. Schedulers.test
测试专用调度器,支持手动推进时间模拟:
List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable
.interval(1, TimeUnit.SECONDS, scheduler);
Observable.from(letters)
.zipWith(tick, (string, index) -> index + "-" + string)
.subscribeOn(scheduler)
.subscribe(subscriber);
subscriber.assertNoValues(); // 初始无输出
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS); // 推进1秒
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");
scheduler.advanceTimeTo(3, TimeUnit.SECONDS); // 推进到3秒
subscriber.assertCompleted();
assertThat(
subscriber.getOnNextEvents(),
hasItems("0-A", "1-B", "2-C"));
完美解决异步测试中的时间控制问题。
10. 默认调度器
部分操作符允许指定调度器,未指定时使用默认值:
ExecutorService poolA = newFixedThreadPool(
10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
.delay(1, TimeUnit.SECONDS, schedulerA) // 指定调度器
.subscribe(i -> result+= Thread.currentThread().getName() + i + " ");
Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));
常见支持自定义调度器的操作符:
| 操作符 | 默认调度器 |
|--------------|------------------|
| delay
| computation()
|
| buffer
| computation()
|
| interval
| computation()
|
| timer
| computation()
|
| timeout
| computation()
|
❌ 未指定时默认使用 computation()
,在 I/O 密集型场景可能成为瓶颈。
11. 总结
✅ 真正的响应式应用中,异步操作只需少量线程和调度器。
掌握调度器是编写可扩展、安全 RxJava 代码的关键:
- 高负载场景下
subscribeOn
和observeOn
的差异至关重要 - 确保下游调度器能处理上游产生的负载(参考背压机制)
所有示例代码可在 GitHub 项目 获取,Maven 工程可直接导入运行。