1. 概述
在 Kotlin 中,协程(Coroutines)是构建非阻塞、并发应用的首选方式。本文将深入讲解 Channel(通道)——它是协程之间通信的核心机制。✅
简单来说:多个协程可以通过 Channel 安全地传递数据,无需显式加锁或使用复杂的线程同步机制。这使得编写高并发程序更加直观和安全。
2. 什么是 Channel?
你可以把 Channel 理解为一个带挂起能力的线程安全队列。一个或多个生产者协程通过 send
向其中写入数据,一个或多个消费者协程通过 receive
读取数据。这两个操作都是挂起函数(suspending functions),不会阻塞线程,而是让协程自动挂起/恢复。
下面是一个基础示例,展示两个协程如何通过 Channel 通信:
@Test
fun should_pass_data_from_one_coroutine_to_another() {
runBlocking {
// given
val channel = Channel<String>()
// when
launch { // coroutine1
channel.send("Hello World!")
}
val result = async { // coroutine2
channel.receive()
}
// then
assertThat(result.await()).isEqualTo("Hello World!")
}
}
📌 解析:
- 创建了一个
String
类型的 Channel。 - 协程1 调用
send
发送消息后自动挂起,直到有接收方。 - 协程2 使用
async
构建器启动,调用receive
获取值。 - 最终断言接收到的消息正确。
⚠️ 注意:如果接收方不存在,发送方会一直挂起(除非是特殊类型的 Channel),这是协程协作式调度的关键特性。
3. Channel 的类型
Kotlin 提供了四种不同行为的 Channel,主要区别在于其内部缓冲区的容量策略。
3.1. Rendezvous Channel(会合通道)
- 无缓冲区,发送方必须等到接收方调用
receive
才能完成发送(反之亦然)。 - 相当于“手递手”交接,典型的一对一同步场景。
- 创建方式:
Channel()
或Channel(Channel.CONFLATED)
❌ 不对!应为Channel(0)
或默认构造。
✅ 正确创建:
val basket = Channel<String>()
(等价于容量为 0)
示例代码:
val basket = Channel<String>()
launch { // coroutine1
val fruits = listOf("Apple", "Orange")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
repeat(2) {
delay(100)
println("coroutine2: Received ${basket.receive()}")
}
}
输出结果:
coroutine1: Sending Apple
coroutine2: Received Apple
coroutine1: Sending Orange
coroutine2: Received Orange
📌 特点:完全同步,每次 send-receive 成对出现,适合需要强同步的场景。
3.2. Buffered Channel(有界缓冲通道)
- 具有固定大小的缓冲区,可缓解生产者与消费者的速率差异。
- 创建时指定容量:
Channel(capacity)
。
示例:
val basket = Channel<String>(1)
launch { // coroutine1
val fruits = listOf("Apple", "Orange", "Banana")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
repeat(3) {
delay(100)
println("coroutine2: Received ${basket.receive()}")
}
}
输出:
coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine2: Received Apple
coroutine1: Sending Banana
coroutine2: Received Orange
coroutine2: Received Banana
📌 关键点:
- 第一次 send 不挂起(缓冲区空)。
- 第二次 send 时缓冲区满,协程1 挂起等待消费。
- 一旦有消费,缓冲腾出空间,协程1 继续发送。
⚠️ 踩坑提醒:设置过小的缓冲可能导致频繁挂起,过大则浪费内存,需根据业务吞吐量权衡。
3.3. Unlimited Channel(无界通道)
- 缓冲区理论上无限大(实际受限于 JVM 内存)。
- 创建方式:
Channel(Channel.UNLIMITED)
。
示例:
val channel = Channel<Int>(Channel.UNLIMITED)
launch { // coroutine1
repeat(100) {
println("coroutine1: Sending $it")
channel.send(it)
}
}
launch { // coroutine2
repeat(100) {
println("coroutine2: Received ${channel.receive()}")
}
}
输出(简化):
coroutine1: Sending 0
coroutine1: Sending 1
...
coroutine1: Sending 99
coroutine2: Received 0
...
coroutine2: Received 99
✅ 优势:生产者几乎不会挂起,适合突发流量缓冲。
❌ 风险:若消费者处理慢,极易导致 OutOfMemoryError
。线上慎用!
3.4. Conflated Channel(合并通道)
- 只保留最新写入的值,新值覆盖旧值。
send
永不挂起,receive
总是拿到最新的那个。- 创建方式:
Channel(Channel.CONFLATED)
。
示例:
val basket = Channel<String>(Channel.CONFLATED)
launch { // coroutine1
val fruits = listOf("Apple", "Orange", "Banana")
for (fruit in fruits) {
println("coroutine1: Sending $fruit")
basket.send(fruit)
}
}
launch { // coroutine2
println("coroutine2: Received ${basket.receive()}")
}
输出:
coroutine1: Sending Apple
coroutine1: Sending Orange
coroutine1: Sending Banana
coroutine2: Received Banana
📌 适用场景:配置更新、UI 状态推送等只关心最新状态的场合。
4. 使用 Channel 实现生产者-消费者模式
这是 Channel 最常见的应用场景之一。多个生产者生成数据,多个消费者并行处理,通过 Channel 解耦。
4.1. 单生产者 → 单消费者
使用 produce
构建器创建生产者,返回 ReceiveChannel
(只读通道)。
fun CoroutineScope.produceFruits(): ReceiveChannel<String> = produce {
val fruits = listOf("Apple", "Orange", "Apple")
for (fruit in fruits) send(fruit)
}
消费者通过 for
循环消费:
val fruitChannel = produceFruits()
for (fruit in fruitChannel) {
println(fruit)
}
输出:
Apple
Orange
Apple
✅ 注意:produce
返回的是 ReceiveChannel
,确保消费者无法反向写入。
4.2. 单生产者 → 多消费者
多个消费者共享同一个 ReceiveChannel
,实现工作负载分发。
fun CoroutineScope.producePizzaOrders(): ReceiveChannel<String> = produce {
var x = 1
while (true) {
send("Pizza Order No. ${x++}")
delay(100)
}
}
fun CoroutineScope.pizzaOrderProcessor(id: Int, orders: ReceiveChannel<String>) = launch {
for (order in orders) {
println("Processor #$id is processing $order")
}
}
主函数启动多个处理器:
fun main() = runBlocking {
val pizzaOrders = producePizzaOrders()
repeat(3) {
pizzaOrderProcessor(it + 1, pizzaOrders)
}
delay(1000)
pizzaOrders.cancel()
}
输出示例:
Processor #1 is processing Pizza Order No. 1
Processor #1 is processing Pizza Order No. 2
Processor #2 is processing Pizza Order No. 3
Processor #3 is processing Pizza Order No. 4
...
⚠️ 注意:所有消费者共享同一通道,每个消息只会被一个消费者处理(竞争消费)。
4.3. 多生产者 → 单消费者
多个生产者向同一个 Channel 写入,消费者统一处理。
suspend fun fetchYoutubeVideos(channel: SendChannel<String>) {
val videos = listOf("cat video", "food video")
for (video in videos) {
delay(100)
channel.send(video)
}
}
suspend fun fetchTweets(channel: SendChannel<String>) {
val tweets = listOf("tweet: Earth is round", "tweet: Coroutines and channels are cool")
for (tweet in tweets) {
delay(100)
channel.send(tweet)
}
}
主函数:
fun main() = runBlocking {
val aggregate = Channel<String>()
launch { fetchYoutubeVideos(aggregate) }
launch { fetchTweets(aggregate) }
repeat(4) {
println(aggregate.receive())
}
coroutineContext.cancelChildren()
}
输出:
cat video
tweet: Earth is round
food video
tweet: Coroutines and channels are cool
✅ 场景:日志聚合、事件总线等。
5. 使用 Channel 构建数据处理流水线
将多个生产者-消费者串联成管道(Pipeline),实现复杂的数据流处理。
以披萨制作为例:订单 → 烘焙 → 加料 → 出餐。
定义数据类:
data class PizzaOrder(val orderNumber: Int, val orderStatus: OrderStatus = PENDING)
enum class OrderStatus { PENDING, BAKED, TOPPED }
各阶段处理逻辑:
fun CoroutineScope.baking(orders: ReceiveChannel<PizzaOrder>) = produce {
for (order in orders) {
delay(200)
println("Baking ${order.orderNumber}")
send(order.copy(orderStatus = OrderStatus.BAKED))
}
}
fun CoroutineScope.topping(orders: ReceiveChannel<PizzaOrder>) = produce {
for (order in orders) {
delay(50)
println("Topping ${order.orderNumber}")
send(order.copy(orderStatus = OrderStatus.TOPPED))
}
}
fun CoroutineScope.produceOrders(count: Int) = produce {
repeat(count) {
delay(50)
send(PizzaOrder(orderNumber = it + 1))
}
}
组装流水线:
fun main() = runBlocking {
val orders = produceOrders(3)
val readyOrders = topping(baking(orders)) // 流水线串联
for (order in readyOrders) {
println("Serving ${order.orderNumber}")
}
delay(3000)
coroutineContext.cancelChildren()
}
输出:
Baking 1
Topping 1
Serving 1
Baking 2
Topping 2
Serving 2
Baking 3
Topping 3
Serving 3
✅ 优势:各阶段独立、可复用、天然支持背压(Backpressure)。
6. Ticker Channel(定时通道)
ticker
是协程中的定时器替代方案,周期性地发射 Unit
值,常用于定时任务。
示例:每 5 秒获取一次股票价格。
fun stockPrice(stock: String): Double {
println("${System.currentTimeMillis()} - Fetching stock price of $stock")
return kotlin.random.Random.nextDouble(2.0, 3.0)
}
fun main() = runBlocking {
val tickerChannel = ticker(delayMillis = 5000)
repeat(3) {
tickerChannel.receive()
println("${System.currentTimeMillis()} - ${stockPrice("TESLA")}")
}
delay(11000)
tickerChannel.cancel()
}
输出示例:
1712345678901 - Fetching stock price of TESLA
1712345678901 - 2.7380844072456583
1712345683902 - Fetching stock price of TESLA
1712345683902 - 2.3459508859536635
1712345688903 - Fetching stock price of TESLA
1712345688903 - 2.3137592916266994
✅ 优点:比 delay
更精确,且可通过取消通道来停止定时任务。
7. 总结
本文系统介绍了 Kotlin Channel 的核心概念与实战用法:
- ✅ 四种 Channel 类型:Rendezvous、Buffered、Unlimited、Conflated,各有适用场景。
- ✅ 生产者-消费者模式:通过
produce
和ReceiveChannel
实现高效解耦。 - ✅ 数据流水线:串联多个处理阶段,构建复杂异步流程。
- ✅ Ticker Channel:协程友好的定时任务解决方案。
📌 最佳实践建议:
- 优先使用有界缓冲(Buffered)避免 OOM。
- 明确 Channel 的所有权,及时 cancel 避免资源泄漏。
- 在 UI 场景考虑使用
StateFlow
或SharedFlow
替代部分 Channel 场景。
所有示例代码已整理至 GitHub: https://github.com/baeldung/kotlin-tutorials/tree/master/core-kotlin-modules/core-kotlin-concurrency