1. 引言

本文将深入探讨在实际项目中可能遇到的多种协程场景,以及如何有效地组合不同类型的协程来构建高效、可维护的异步逻辑。

市面上很多关于 Kotlin 协程的文章都停留在“Hello World”级别,但真实业务往往复杂得多。我们面对的是混合了网络请求、数据库操作、日志记录和并发控制的系统。因此,本文旨在填补这一空白,提供一套实用的模式和技巧,帮助你在高并发场景下更好地驾驭协程。

2. 一个贴近现实的任务

设想这样一个典型业务需求:你需要处理用户请求,并完成以下步骤:

✅ 同时调用两个外部接口(顺序无关)
✅ 执行一段依赖两者结果的业务逻辑
✅ 将结果通过 JDBC 写入数据库
✅ 记录审计日志用于调试和追踪
✅ 最终返回响应给用户

我们希望尽可能利用协程提升效率。其中 HTTP 请求天然适合协程模型,但 JDBC 操作是阻塞的,这就需要特别处理:

fun processCallAsync(userInput: UserInput): Deferred<String> = async {
    val responseA = client.get(urlA, userInput.query)
    val responseB = client.get(urlB, userInput.query)
    (responseA.body + responseB.body)
      .also {
          storeToDatabase(userInput, it)
          logger.info("User input $userInput was processed with the result $it")
      }
}

⚠️ 注意:上述代码看似简洁,实则存在严重问题 —— storeToDatabase 是阻塞调用,它会占用当前协程所在线程,可能导致线程饥饿,尤其是在主线程池(如 Dispatchers.Default)中执行时。

3. 理解上下文(CoroutineContext)

协程本质上是一种语言层面的抽象,其底层仍需运行在 JVM 线程之上。每个协程都需要一个 CoroutineContext 来定义其运行环境。

一个 CoroutineContext 通常由 CoroutineScope 提供,而 CoroutineScope 负责管理其所创建协程的生命周期。

核心组成

  • CoroutineDispatcher:决定协程运行在哪个线程或线程池上(如 Dispatchers.IO, Dispatchers.Default
  • ✅ 上下文数据:例如 MDC(Mapped Diagnostic Context)用于日志追踪
  • ✅ 异常处理器、Job 等

何时需要切换上下文?

在设计协程流时,应考虑以下三种情况来分离上下文:

  1. 阻塞操作:若某任务会阻塞线程(如 JDBC、文件读写),必须将其调度到专用 IO 线程池,避免拖累其他协程。
  2. 生命周期独立:子协程需独立于父协程存活(例如日志上报不应因主流程失败而中断)。
  3. 扩展上下文信息:当你想为协程添加额外元数据(如 traceId),需创建新的子上下文。

4. 处理阻塞型 IO 任务

协程的优势在于以更少线程实现高并发,但这建立在“非阻塞”前提下。一旦出现线程阻塞,反而可能引发线程耗尽,造成服务不可用。

4.1 使用专用 IO 上下文

最佳实践是将阻塞 IO 操作显式调度到 Dispatchers.IO

launch(Dispatchers.IO) {
    storeToDatabase(userInput, it)
}

📌 Dispatchers.IO 是 Kotlin 提供的专为 IO 密集型任务优化的线程池,内部基于 ForkJoinPool 实现,支持动态扩容。

⚠️ 踩坑提醒:虽然 Dispatchers.IO 可自动增长线程数,但在极端情况下仍可能耗尽资源。建议对数据库连接本身也做限流。

更彻底的方案:全异步栈

理想状态下,应尽量使用非阻塞驱动替代传统阻塞 API:

  • 文件操作 → 使用 FileChannel 配合 AsynchronousFileChannel
  • 数据库访问 → 使用 R2DBC 替代 JDBC
  • HTTP 客户端 → 使用 Ktor Client 或 OkHttp 的异步接口

这样整个调用链都是非阻塞的,才能真正发挥协程威力。

4.2 使用专用 Executor(进阶)

另一种思路是将阻塞任务提交到独立的单线程池中执行,避免影响主协程调度器:

class AsyncWorker(val writerFunction: (Pair<UserInput, String>) -> Int) {
    private val executor = Executors.newSingleThreadExecutor()

    fun submit(input: Pair<UserInput, String>): CompletableFuture<Int> =
      CompletableFuture.supplyAsync({ writerFunction(input) }, executor)
}

fun storeToDatabaseAsync(userInput: UserInput, result: String): CompletableFuture<Int> =
  asyncDbWriter.submit(userInput to result)

然后通过 kotlinx-coroutines-jdk8 提供的扩展函数无缝集成:

storeToDatabaseAsync(userInput, result).await()

✅ 优势:完全隔离了阻塞风险,且可通过 CompletableFuture 实现复杂的回调编排。

5. 异步任务中的 Map-Reduce 模式

协程最擅长的场景之一就是并行发起多个网络请求。我们可以借鉴 Map-Reduce 思想来组织代码。

5.1 并发启动协程

Kotlin 倡导“显式并发”,即所有跨线程操作必须明确声明。要实现两个请求同时发出,必须使用 async {} 包裹:

val responseA = async { client.get(urlA, userInput.query) }
val responseB = async { client.get(urlB, userInput.query) }
return responseA.await().body + responseB.await().body

📌 这里的 async 相当于 “map” 阶段,启动多个异步任务;await() 则是 “reduce” 阶段,聚合结果。

5.2 批量并发启动

当目标 URL 数量不确定或较多时,可用集合操作简化代码:

val result: List<Response> = urls.map { url -> 
    async { client.get(url, userInput.query) } 
}.awaitAll()

⚠️ 注意:awaitAll() 具有“短路”特性 —— 任一请求失败都会导致整体抛出异常并取消作用域。

容错处理方案

若允许部分失败(如聚合类接口),可结合 runCatching 实现降级:

urls.map { url -> 
    async { runCatching { client.get(url, userInput.query) } } 
}
.mapNotNull { deferred ->
    deferred.await().fold(
        onSuccess = { it },
        onFailure = {
            logger.error("Error during one of the calls", it)
            null
        }
    )
}

✅ 应用场景:航班比价、商品聚合等对完整性要求不高的服务。

📌 类似地,若只关心任务完成而不关心返回值,可用 launch {} + joinAll()

results.map { 
    launch { storeToDatabase(userInput, it) } 
}.joinAll()

6. 火箭发射式协程(Fire-and-Forget)

有些任务无需等待结果,只需确保最终被执行即可,典型例子是审计日志:

launch {
    logger.info("User input $userInput was processed with the result $it")
}

这类任务的上下文选择很关键:

  • ✅ 若日志与主流程强关联(如仅成功才记录),使用相同上下文
  • ✅ 若希望日志独立完成(即使主流程失败也要记录),应使用独立 Scope 或 SupervisorJob

如何确认任务已完成?

虽然叫“发射即忘”,但我们仍可通过 Job.join() 实现同步等待:

launch(Dispatchers.IO) {
    storeToDatabase(userInput, it)
}.join() // 阻塞直到完成

📌 适用于某些关键副作用必须保证执行完毕的场景。

7. 桥接普通函数与挂起函数

并非所有项目都能从头使用协程。在传统线程模型中引入协程时,如何启动根协程成为关键。

方案一:runBlocking(慎用)

最简单的方式是在主线程中使用 runBlocking

runBlocking {
    processCallAsync(userInput).await()
}

⚠️ 踩坑提醒:runBlocking 会阻塞当前线程,绝不应在协程内部或事件循环中使用,仅适合 main 函数或测试用例。

方案二:launch + Job 控制

更灵活的做法是启动顶层协程并持有 Job 引用:

fun processCallAndForget(userInput: UserInput): Job = launch {
    processCallAsync(userInput).await()
}

通过返回的 Job,你可以:

  • 检查状态:job.isActive
  • 主动取消:job.cancel()
  • 等待结束:配合 runBlocking { job.join() }

📌 注意:join() 是挂起函数,必须包裹在协程构建器或 runBlocking 中调用。

8. 总结

本文系统梳理了协程组合的核心原则:

  • ✅ 区分 CPU 密集、IO 密集任务,合理使用 Dispatchers
  • ✅ 阻塞调用必须移出主协程上下文,推荐使用 Dispatchers.IO
  • ✅ 利用 async/await 实现并行化,模拟 Map-Reduce 模式
  • ✅ 对批量任务做好容错设计,避免单点故障影响整体
  • ✅ 审慎处理“发射即忘”任务的生命周期与上下文归属
  • ✅ 在混合架构中正确桥接阻塞与非阻塞世界

最终目标是构建清晰、健壮、高效的异步流水线,而不是为了用协程而用协程。

示例代码已托管至 GitHub:https://github.com/Baeldung/kotlin-tutorials/tree/master/core-kotlin-modules/core-kotlin-concurrency-2


原始标题:Composing Coroutines and Suspend Functions