1. 概述
在 Android 或后端 Kotlin 开发中,我们可能正在使用 RxJava 的 Single
,也可能逐渐转向更现代的 Kotlin Coroutines。两者都用于处理异步操作,避免阻塞主线程。
但项目迁移不可能一蹴而就。一个稳妥的做法是:**逐步将 RxJava 的 Single
转为协程中的 Deferred
**,这样既能保留现有逻辑,又能享受协程的简洁语法。
本文将探讨 **如何安全、高效地将 RxJava 的 Single<T>
转换为 Kotlin 的 Deferred<T>
**,并分析每种方案的适用场景和潜在“坑点”。
2. Single 与 Deferred 对比
特性 | Single (RxJava) |
Deferred (Kotlin Coroutines) |
---|---|---|
用途 | 表示一个只会发射一个结果或错误的 Observable | 表示一个将来可用的结果(类似 Future) |
典型场景 | API 请求、数据库查询等单次异步操作 | 协程中启动异步任务,后续通过 await() 获取结果 |
结果获取方式 | subscribe() 或 blockingGet() |
await() |
异常处理 | 通过 onError() 回调 |
使用 try/catch 包裹 await() |
是否可取消 | 支持通过 Disposable 取消 |
支持通过 Job.cancel() 取消 |
✅ 总结:两者语义高度相似——都是为了表达“一个异步操作最终会返回一个结果或失败”。因此转换是合理且常见的需求。
3. 将 Single 转换为 Deferred 的方法
为了演示,我们先定义一个商品类和测试数据:
data class Product(val id: Int, val name: String, val price: Double)
private val allProducts = listOf(
Product(1, "Samsung", 1200.0),
Product(2, "Oppo", 800.0),
Product(3, "Nokia", 450.0),
Product(4, "Lenovo", 550.0),
Product(5, "ASUS", 400.0)
)
接下来创建一个返回 Single<List<Product>>
的函数,筛选价格大于 500 并按价格排序:
private fun getFilteredProducts(): Single<List<Product>> {
return Single.just(allProducts)
.map { products ->
products.sortedBy { it.price }.filter { it.price > 500 }
}
.subscribeOn(Schedulers.io())
}
再准备一个扩展函数用于验证结果是否正确:
private suspend fun Deferred<*>.assertOver500AndSorted() {
assertThat(this.await() as List<*>).containsExactly(
Product(4, "Lenovo", 550.0),
Product(2, "Oppo", 800.0),
Product(1, "Samsung", 1200.0)
)
}
使用方式如下:
deferred.assertOver500AndSorted()
下面我们尝试四种不同的转换方式。
3.1 使用 async + blockingGet
最直接的方式是利用 async
启动协程,并在其中调用 blockingGet()
阻塞等待结果:
val deferred = async { getFilteredProducts().blockingGet() }
⚠️ 踩坑提醒:
blockingGet()
会阻塞当前协程所在的线程,虽然不会影响主线程(因为我们用了Schedulers.io()
),但在高并发场景下仍可能浪费线程资源。- 如果原始
Single
出错,异常会直接抛出,需外部用try/catch
捕获。
✅ 优点:代码最简单,适合快速原型或低频调用场景。
❌ 缺点:本质上仍是阻塞式调用,违背了协程非阻塞的设计哲学。
3.2 使用 CompletableDeferred
CompletableDeferred
是 Deferred
的可完成版本,允许我们手动设置结果或异常。
我们可以将其作为 Single.subscribe()
的回调目标:
val deferred = CompletableDeferred<List<Product>>()
getFilteredProducts().subscribe(
deferred::complete,
deferred::completeExceptionally
)
- ✅
complete(value)
:当Single
成功发射数据时调用。 - ✅
completeExceptionally(exception)
:发生错误时调用,自动转为协程中的异常。
✅ 优点:
- 完全非阻塞,响应式风格。
- 异常处理清晰,天然对接协程的异常机制。
⚠️ 注意:务必确保只调用一次 complete
或 completeExceptionally
,否则会抛出 IllegalStateException
。
3.3 使用 suspendCoroutine 或 suspendCancellableCoroutine
这是更底层但更灵活的方式。suspendCoroutine
允许我们在 suspend 函数中挂起协程,等待回调唤醒。
val deferred = async {
suspendCoroutine { continuation ->
getFilteredProducts().subscribe(
continuation::resume,
continuation::resumeWithException
)
}
}
continuation.resume(value)
触发协程恢复。continuation.resumeWithException(e)
以异常结束协程。
如果需要支持取消,应使用 suspendCancellableCoroutine
:
val deferred = async {
suspendCancellableCoroutine { continuation ->
val disposable = getFilteredProducts().subscribe(
continuation::resume,
continuation::resumeWithException
)
// 当协程被取消时,自动 dispose RxJava 流
continuation.invokeOnCancellation { disposable.dispose() }
}
}
✅ 优点:
- 真正挂起而非阻塞。
- 支持取消传播,资源管理更安全。
❌ 缺点:代码稍复杂,适合封装成通用工具函数。
3.4 使用 Kotlinx Coroutines Rx3 扩展库
官方提供了 kotlinx-coroutines-rx3
库,专门用于桥接 RxJava 3 与 Kotlin Coroutines。
首先添加依赖:
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-rx3</artifactId>
<version>1.8.0</version>
</dependency>
该库为 Single
提供了 await()
扩展函数:
suspend fun <T> SingleSource<T>.await(): T
使用方式极其简洁:
val deferred = async { getFilteredProducts().await() }
- 成功时返回值。
- 失败时自动抛出异常,可被
try/catch
捕获。
✅ 优点:
- 语法最简洁,语义清晰。
- 内部实现基于
suspendCancellableCoroutine
,支持取消。 - 官方维护,稳定性高。
❌ 缺点:引入额外依赖,如果你项目已经重度使用 RxJava,值得引入;否则可能略显臃肿。
4. 总结与建议
方法 | 是否推荐 | 适用场景 |
---|---|---|
async { blockingGet() } |
⚠️ 仅临时 | 快速迁移、低频调用,不追求性能 |
CompletableDeferred |
✅ 推荐 | 需要精细控制回调逻辑 |
suspendCoroutine |
✅ 可选 | 封装通用转换工具 |
suspendCancellableCoroutine |
✅ 强烈推荐 | 需要支持取消的生产环境 |
kotlinx-coroutines-rx3.await() |
✅ 首选 | 已使用 RxJava 且希望无缝集成协程的项目 |
📌 最终建议:
- 如果你已经在使用 RxJava 且短期内无法完全迁移到 Flow,**强烈推荐引入
kotlinx-coroutines-rx3
**,它提供了最优雅、最安全的互操作方式。 - 若不想引入新依赖,优先选择
suspendCancellableCoroutine
实现,兼顾性能与可取消性。
所有示例代码均可在 GitHub 获取:https://github.com/baeldung/kotlin-tutorials/tree/master/core-kotlin-modules/core-kotlin-concurrency-3