1. 概述

在 Kotlin 中,协程(Coroutines)是构建非阻塞、并发应用的首选方式。本文将深入讲解 Channel(通道)——它是协程之间通信的核心机制。✅

简单来说:多个协程可以通过 Channel 安全地传递数据,无需显式加锁或使用复杂的线程同步机制。这使得编写高并发程序更加直观和安全。

2. 什么是 Channel?

你可以把 Channel 理解为一个带挂起能力的线程安全队列。一个或多个生产者协程通过 send 向其中写入数据,一个或多个消费者协程通过 receive 读取数据。这两个操作都是挂起函数(suspending functions),不会阻塞线程,而是让协程自动挂起/恢复。

下面是一个基础示例,展示两个协程如何通过 Channel 通信:

@Test
fun should_pass_data_from_one_coroutine_to_another() {
    runBlocking {
        // given
        val channel = Channel<String>()

        // when
        launch { // coroutine1
            channel.send("Hello World!")
        }
        val result = async { // coroutine2
            channel.receive()
        }

        // then
        assertThat(result.await()).isEqualTo("Hello World!")
    }
}

📌 解析:

  • 创建了一个 String 类型的 Channel。
  • 协程1 调用 send 发送消息后自动挂起,直到有接收方。
  • 协程2 使用 async 构建器启动,调用 receive 获取值。
  • 最终断言接收到的消息正确。

⚠️ 注意:如果接收方不存在,发送方会一直挂起(除非是特殊类型的 Channel),这是协程协作式调度的关键特性。

3. Channel 的类型

Kotlin 提供了四种不同行为的 Channel,主要区别在于其内部缓冲区的容量策略。

3.1. Rendezvous Channel(会合通道)

  • 无缓冲区,发送方必须等到接收方调用 receive 才能完成发送(反之亦然)。
  • 相当于“手递手”交接,典型的一对一同步场景。
  • 创建方式:Channel()Channel(Channel.CONFLATED) ❌ 不对!应为 Channel(0) 或默认构造。

✅ 正确创建:val basket = Channel<String>()(等价于容量为 0)

示例代码:

val basket = Channel<String>()

launch { // coroutine1
    val fruits = listOf("Apple", "Orange")
    for (fruit in fruits) {
        println("coroutine1: Sending $fruit")
        basket.send(fruit)
    }
}

launch { // coroutine2
    repeat(2) {
        delay(100)
        println("coroutine2: Received ${basket.receive()}")
    }
}

输出结果:

coroutine1: Sending Apple
coroutine2: Received Apple
coroutine1: Sending Orange
coroutine2: Received Orange

📌 特点:完全同步,每次 send-receive 成对出现,适合需要强同步的场景。

3.2. Buffered Channel(有界缓冲通道)

  • 具有固定大小的缓冲区,可缓解生产者与消费者的速率差异。
  • 创建时指定容量:Channel(capacity)

示例:

val basket = Channel<String>(1)

launch { // coroutine1
    val fruits = listOf("Apple", "Orange", "Banana")
    for (fruit in fruits) {
        println("coroutine1: Sending $fruit")
        basket.send(fruit)
    }
}

launch { // coroutine2
    repeat(3) {
        delay(100)
        println("coroutine2: Received ${basket.receive()}")
    }
}

输出:

coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine2: Received Apple
coroutine1: Sending Banana
coroutine2: Received Orange
coroutine2: Received Banana

📌 关键点:

  • 第一次 send 不挂起(缓冲区空)。
  • 第二次 send 时缓冲区满,协程1 挂起等待消费。
  • 一旦有消费,缓冲腾出空间,协程1 继续发送。

⚠️ 踩坑提醒:设置过小的缓冲可能导致频繁挂起,过大则浪费内存,需根据业务吞吐量权衡。

3.3. Unlimited Channel(无界通道)

  • 缓冲区理论上无限大(实际受限于 JVM 内存)。
  • 创建方式:Channel(Channel.UNLIMITED)

示例:

val channel = Channel<Int>(Channel.UNLIMITED)

launch { // coroutine1
    repeat(100) {
        println("coroutine1: Sending $it")
        channel.send(it)
    }
}

launch { // coroutine2
    repeat(100) {
        println("coroutine2: Received ${channel.receive()}")
    }
}

输出(简化):

coroutine1: Sending 0
coroutine1: Sending 1
...
coroutine1: Sending 99
coroutine2: Received 0
...
coroutine2: Received 99

✅ 优势:生产者几乎不会挂起,适合突发流量缓冲。

❌ 风险:若消费者处理慢,极易导致 OutOfMemoryError。线上慎用!

3.4. Conflated Channel(合并通道)

  • 只保留最新写入的值,新值覆盖旧值。
  • send 永不挂起,receive 总是拿到最新的那个。
  • 创建方式:Channel(Channel.CONFLATED)

示例:

val basket = Channel<String>(Channel.CONFLATED)

launch { // coroutine1
    val fruits = listOf("Apple", "Orange", "Banana")
    for (fruit in fruits) {
        println("coroutine1: Sending $fruit")
        basket.send(fruit)
    }
}

launch { // coroutine2
    println("coroutine2: Received ${basket.receive()}")
}

输出:

coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine1: Sending Banana
coroutine2: Received Banana

📌 适用场景:配置更新、UI 状态推送等只关心最新状态的场合。

4. 使用 Channel 实现生产者-消费者模式

这是 Channel 最常见的应用场景之一。多个生产者生成数据,多个消费者并行处理,通过 Channel 解耦。

4.1. 单生产者 → 单消费者

使用 produce 构建器创建生产者,返回 ReceiveChannel(只读通道)。

fun CoroutineScope.produceFruits(): ReceiveChannel<String> = produce {
    val fruits = listOf("Apple", "Orange", "Apple")
    for (fruit in fruits) send(fruit)
}

消费者通过 for 循环消费:

val fruitChannel = produceFruits()
for (fruit in fruitChannel) {
    println(fruit)
}

输出:

Apple
Orange
Apple

✅ 注意:produce 返回的是 ReceiveChannel,确保消费者无法反向写入。

4.2. 单生产者 → 多消费者

多个消费者共享同一个 ReceiveChannel,实现工作负载分发。

fun CoroutineScope.producePizzaOrders(): ReceiveChannel<String> = produce {
    var x = 1
    while (true) {
        send("Pizza Order No. ${x++}")
        delay(100)
    }
}

fun CoroutineScope.pizzaOrderProcessor(id: Int, orders: ReceiveChannel<String>) = launch {
    for (order in orders) {
        println("Processor #$id is processing $order")
    }
}

主函数启动多个处理器:

fun main() = runBlocking {
    val pizzaOrders = producePizzaOrders()
    repeat(3) {
        pizzaOrderProcessor(it + 1, pizzaOrders)
    }

    delay(1000)
    pizzaOrders.cancel()
}

输出示例:

Processor #1 is processing Pizza Order No. 1
Processor #1 is processing Pizza Order No. 2
Processor #2 is processing Pizza Order No. 3
Processor #3 is processing Pizza Order No. 4
...

⚠️ 注意:所有消费者共享同一通道,每个消息只会被一个消费者处理(竞争消费)。

4.3. 多生产者 → 单消费者

多个生产者向同一个 Channel 写入,消费者统一处理。

suspend fun fetchYoutubeVideos(channel: SendChannel<String>) {
    val videos = listOf("cat video", "food video")
    for (video in videos) {
        delay(100)
        channel.send(video)
    }
}

suspend fun fetchTweets(channel: SendChannel<String>) {
    val tweets = listOf("tweet: Earth is round", "tweet: Coroutines and channels are cool")
    for (tweet in tweets) {
        delay(100)
        channel.send(tweet)
    }
}

主函数:

fun main() = runBlocking {
    val aggregate = Channel<String>()
    launch { fetchYoutubeVideos(aggregate) }
    launch { fetchTweets(aggregate) }

    repeat(4) {
        println(aggregate.receive())
    }

    coroutineContext.cancelChildren()
}

输出:

cat video
tweet: Earth is round
food video
tweet: Coroutines and channels are cool

✅ 场景:日志聚合、事件总线等。

5. 使用 Channel 构建数据处理流水线

将多个生产者-消费者串联成管道(Pipeline),实现复杂的数据流处理。

以披萨制作为例:订单 → 烘焙 → 加料 → 出餐。

定义数据类:

data class PizzaOrder(val orderNumber: Int, val orderStatus: OrderStatus = PENDING)
enum class OrderStatus { PENDING, BAKED, TOPPED }

各阶段处理逻辑:

fun CoroutineScope.baking(orders: ReceiveChannel<PizzaOrder>) = produce {
    for (order in orders) {
        delay(200)
        println("Baking ${order.orderNumber}")
        send(order.copy(orderStatus = OrderStatus.BAKED))
    }
}

fun CoroutineScope.topping(orders: ReceiveChannel<PizzaOrder>) = produce {
    for (order in orders) {
        delay(50)
        println("Topping ${order.orderNumber}")
        send(order.copy(orderStatus = OrderStatus.TOPPED))
    }
}

fun CoroutineScope.produceOrders(count: Int) = produce {
    repeat(count) {
        delay(50)
        send(PizzaOrder(orderNumber = it + 1))
    }
}

组装流水线:

fun main() = runBlocking {
    val orders = produceOrders(3)
    val readyOrders = topping(baking(orders)) // 流水线串联

    for (order in readyOrders) {
        println("Serving ${order.orderNumber}")
    }

    delay(3000)
    coroutineContext.cancelChildren()
}

输出:

Baking 1
Topping 1
Serving 1
Baking 2
Topping 2
Serving 2
Baking 3
Topping 3
Serving 3

✅ 优势:各阶段独立、可复用、天然支持背压(Backpressure)。

6. Ticker Channel(定时通道)

ticker 是协程中的定时器替代方案,周期性地发射 Unit 值,常用于定时任务。

示例:每 5 秒获取一次股票价格。

fun stockPrice(stock: String): Double {
    println("${System.currentTimeMillis()} - Fetching stock price of $stock")
    return kotlin.random.Random.nextDouble(2.0, 3.0)
}

fun main() = runBlocking {
    val tickerChannel = ticker(delayMillis = 5000)

    repeat(3) {
        tickerChannel.receive()
        println("${System.currentTimeMillis()} - ${stockPrice("TESLA")}")
    }

    delay(11000)
    tickerChannel.cancel()
}

输出示例:

1712345678901 - Fetching stock price of TESLA
1712345678901 - 2.7380844072456583
1712345683902 - Fetching stock price of TESLA
1712345683902 - 2.3459508859536635
1712345688903 - Fetching stock price of TESLA
1712345688903 - 2.3137592916266994

✅ 优点:比 delay 更精确,且可通过取消通道来停止定时任务。

7. 总结

本文系统介绍了 Kotlin Channel 的核心概念与实战用法:

  • ✅ 四种 Channel 类型:Rendezvous、Buffered、Unlimited、Conflated,各有适用场景。
  • ✅ 生产者-消费者模式:通过 produceReceiveChannel 实现高效解耦。
  • ✅ 数据流水线:串联多个处理阶段,构建复杂异步流程。
  • ✅ Ticker Channel:协程友好的定时任务解决方案。

📌 最佳实践建议:

  • 优先使用有界缓冲(Buffered)避免 OOM。
  • 明确 Channel 的所有权,及时 cancel 避免资源泄漏。
  • 在 UI 场景考虑使用 StateFlowSharedFlow 替代部分 Channel 场景。

所有示例代码已整理至 GitHub: https://github.com/baeldung/kotlin-tutorials/tree/master/core-kotlin-modules/core-kotlin-concurrency


原始标题:Introduction to Channels in Kotlin