1. 简介
我们之前已经了解过 Quasar,它为 Kotlin 提供了更高效、更简洁的异步编程能力。通过轻量级线程(Fiber)和消息传递机制,我们可以摆脱传统回调地狱或复杂 Future 组合的束缚。
本文将深入探讨 Quasar 的一些进阶特性,帮助你构建更健壮、可维护性更高的并发系统。
⚠️ 注意:以下内容面向已有一定并发编程经验的开发者,基础概念不再赘述。
2. Actor 模型
Actor 模型是一种经典的并发编程范式,在 Erlang 中尤为流行。Quasar 支持 Actor 编程模型,允许我们将应用拆分为多个独立通信的 Actor 实例。
每个 Actor 具备以下核心能力:
- ✅ 创建其他 Actor
- ✅ 向其他 Actor 发送消息
- ✅ 接收并处理来自其他 Actor 的消息
这三者构成了基于 Actor 构建系统的基石。
在 Quasar 中,一个 Actor 本质上是一个 Strand(通常是 Fiber,也可使用 Thread),并配备了一个用于接收消息的 Channel,同时内置了生命周期管理和错误处理支持。
2.1 添加 Actor 依赖
Actor 并非 Quasar 核心模块,需额外引入依赖:
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-actors</artifactId>
<version>0.8.0</version>
</dependency>
✅ 务必确保该版本与其他 Quasar 相关依赖保持一致,避免运行时兼容性问题。
2.2 创建 Actor
通过继承 Actor
类来定义 Actor,需指定名称和邮箱配置(MailboxConfig),并实现 doRun()
方法:
val actor = object : Actor<Int, String>("noopActor", MailboxConfig(5, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
return "Hello"
}
}
- 名称和
MailboxConfig
均为可选参数。 - 若未指定邮箱配置,默认使用无界邮箱(unbounded mailbox)。
⚠️ 关键点:必须手动为 Actor 中的方法添加 @Suspendable
注解。
由于 Kotlin 不强制声明异常,编译器不会自动识别父类中抛出的 SuspendException
,因此需要显式标注以告知 Quasar 运行时此方法可能挂起。
启动 Actor 使用 spawn()
(启动 Fiber)或 spawnThread()
(启动真实线程):
actor.spawn()
println("Noop Actor: ${actor.get()}")
启动后可通过 join()
阻塞等待执行完成,或通过 get()
获取返回值。
2.3 向 Actor 发送消息
调用 spawn()
或 spawnThread()
会返回一个 ActorRef
对象——这是与 Actor 交互的主要句柄。
ActorRef
实现了 SendPort
接口,因此可像操作 Channel 的生产端一样发送消息:
val actorRef = actor.spawn()
actorRef.send(1)
2.4 接收消息
在 doRun()
方法内部调用 receive()
可同步获取下一条待处理消息:
val actor = object : Actor<Int, Void?>("simpleActor", null) {
@Suspendable
override fun doRun(): Void? {
val msg = receive()
println("SimpleActor Received Message: $msg")
return null
}
}
receive()
是阻塞调用,直到有消息到达才会继续执行。
实际开发中,多数 Actor 需持续处理多条消息,通常采用无限循环模式:
val actor = object : Actor<Int, Void?>("loopingActor", null) {
@Suspendable
override fun doRun(): Void? {
while (true) {
val msg = receive()
if (msg > 0) {
println("LoopingActor Received Message: $msg")
} else {
break
}
}
return null
}
}
上述代码将持续处理消息,直至收到 0
才退出。
2.5 消息发送过快:邮箱溢出问题
当消息发送速度远高于 Actor 处理速度时,会导致邮箱堆积甚至溢出。
默认邮箱容量无限,但可通过 MailboxConfig
显式限制大小:
Actor<Int, String>("backlogActor",
MailboxConfig(1, Channels.OverflowPolicy.THROW))
⚠️ 当前 Quasar 版本中,无论设置何种溢出策略,实际行为均为 THROW
——即一旦邮箱满,后续 send()
调用将直接抛出异常。
更严重的是,该异常无法在 Actor 内部被捕获:
try {
receive()
} catch (e: Throwable) {
// ❌ 永远不会执行到这里
}
此时,Actor 会直接终止。外部调用 get()
将抛出 ExecutionException
,其原因链中包含 QueueCapacityExceededException
,指向导致溢出的具体 send()
调用位置。
✅ 规避方案:使用 trySend()
若已知目标 Actor 邮箱有限,建议改用 trySend()
,它不会引发崩溃,而是返回布尔值表示是否成功入队:
val actor = object : Actor<Int, String>("backlogTrySendActor",
MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
@Suspendable
override fun doRun(): String {
TimeUnit.MILLISECONDS.sleep(500);
println("Backlog TrySend Actor Received: ${receive()}")
return "No Exception"
}
}
val actorRef = actor.spawn()
actorRef.trySend(1) // 返回 true
actorRef.trySend(2) // 返回 false
2.6 消息读取过快:非阻塞接收
相反情况是 Actor 主动轮询消息但来源不足。默认 receive()
会永久阻塞,但在某些场景下我们需要更灵活的控制。
Quasar 提供三种接收策略:
方式 | 方法 | 行为 |
---|---|---|
永久阻塞 | receive() |
直到消息到达 |
超时阻塞 | receive(timeout, unit) |
超时后返回 null |
非阻塞 | tryReceive() |
立即返回消息或 null |
示例:带心跳输出的超时接收
while(true) {
val msg = receive(1, TimeUnit.SECONDS)
if (msg != null) {
// 处理消息
} else {
println("Still alive")
}
}
示例:完全非阻塞轮询
while(true) {
val msg = tryReceive()
if (msg != null) {
// 处理消息
} else {
print(".")
}
}
适用于高频率状态检查或 UI 刷新等场景。
2.7 消息过滤与转换
默认情况下,所有发往 Actor 的消息都会被接收。但我们可以通过重写 filterMessage()
方法实现前置过滤或转换:
override fun filterMessage(m: Any?): Int? {
return when (m) {
is Int -> {
if (m % 2 == 0) {
m
} else {
null // 拒绝奇数
}
} else -> super.filterMessage(m)
}
}
✅ filterMessage()
不仅能过滤,还能转换消息内容。返回值即为最终传给 receive()
的对象,兼具 filter
和 map
功能。
例如:过滤奇数,并将偶数乘以 10:
override fun filterMessage(m: Any?): Int? {
return when (m) {
is Int -> {
if (m % 2 == 0) {
m * 10 // 转换
} else {
null
}
}
else -> super.filterMessage(m)
}
}
⚠️ 返回类型必须与 Actor 定义的消息类型匹配。
2.8 Actor 链接与错误传播
目前 Actor 默认独立运行。Quasar 支持通过链接机制实现故障通知,主要分为两种模式:
单向监听:watch()
val watcherRef = watcher.spawn()
val watchedRef = watched.spawn()
watcher.watch(watchedRef)
watcher
会收到 watched
的生命周期事件(如退出),而反向不成立。
双向链接:link()
val firstRef = first.spawn()
val secondRef = second.spawn()
first.link(secondRef) // 双向绑定
两个 Actor 互为监听方。
生命周期事件处理
被监控方的生命周期事件会封装为 LifecycleMessage
放入监控方的邮箱队列,由 filterMessage()
处理后交由 handleLifecycleMessage()
:
override fun handleLifecycleMessage(m: LifecycleMessage?): Int? {
println("WatcherActor Received Lifecycle Message: ${m}")
return super.handleLifecycleMessage(m)
}
⚠️ 关键区别:
watch()
:默认handleLifecycleMessage()
仅清理引用,不会中断当前执行流link()
:除清理外,还会抛出异常,导致doRun()
中的receive()
调用失败
这意味着使用 link()
时,任意关联 Actor 崩溃都会直接中断当前 Actor,适合“一荣俱荣,一损俱损”的强一致性场景;而 watch()
更适合松耦合的监控与恢复逻辑。
2.9 Actor 注册与查找
早期例子中我们通过局部变量持有 ActorRef
,但跨模块调用时难以传递引用。
Quasar 提供全局注册中心 ActorRegistry
:
val actorRef = actor.spawn()
actor.register() // 使用构造时的名字注册
// 其他任意位置查找
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("theActorName")
assertEquals(actorRef, retrievedRef)
若创建时未命名,可在注册时指定:
actor.register("renamedActor")
⚠️ 注意:getActor()
默认会阻塞等待目标 Actor 出现!
为防止永久挂起,应使用带超时的版本:
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("unknownActor", 1, TimeUnit.SECONDS)
Assert.assertNull(retrievedRef) // 超时返回 null
适用于动态服务发现、延迟初始化等场景。
3. Actor 模板(Behaviors)
除了从零构建 Actor,Quasar 还封装了常见模式作为模板(类似 Erlang 中的 Behaviors),提升开发效率。
这些模板通常表现为 Actor
或 ActorRef
的子类,提供更高层次的抽象 API。
3.1 请求-回复模式(Request/Reply)
典型场景:客户端发送请求,Actor 处理后返回结果。
实现步骤:
定义消息类继承
RequestMessage
data class TestMessage(val input: Int) : RequestMessage<Int>()
客户端使用
RequestReplyHelper.call()
发起调用val result = RequestReplyHelper.call(actorRef, TestMessage(50))
Actor 内部处理并回复
val actor = object : Actor<TestMessage, Void?>() { @Suspendable override fun doRun(): Void { while (true) { val msg = receive() RequestReplyHelper.reply(msg, msg.input * 100) } return null } }
✅ 自动处理响应路由,无需手动管理回调上下文。
3.2 Server Actor
对 Request/Reply 的进一步封装,Actor 自身具备同步/异步调用能力。
通过继承 ServerActor
并传入 ServerHandler
实现:
val actor = ServerActor(object : AbstractServerHandler<Int, String, Float>() {
@Suspendable
override fun handleCall(from: ActorRef<*>?, id: Any?, m: Int?): String {
println("Called with message: $m from $from")
return m?.toString() ?: "None"
}
@Suspendable
override fun handleCast(from: ActorRef<*>?, id: Any?, m: Float?) {
println("Cast message: $m from $from")
}
})
方法说明:
方法 | 用途 |
---|---|
init / terminate |
生命周期钩子 |
handleCall |
同步请求处理,需返回结果 |
handleCast |
异步通知,无需回复 |
handleInfo |
处理普通消息(非 call/cast) |
handleTimeout |
空闲超时处理 |
启动后返回 Server
类型引用,支持 call()
、cast()
和 shutdown()
:
val server = actor.spawn()
val result = server.call(5)
server.cast(2.5f)
server.shutdown()
3.3 代理服务器(Proxy Server)
ServerActor
的语法糖,利用 Java 动态代理将接口调用自动转发至 Actor。
定义标准 Java 接口:
@Suspendable
interface Summer {
fun sum(a: Int, b: Int): Int
}
创建 ProxyServerActor
:
val actor = ProxyServerActor(false, object : Summer {
override fun sum(a: Int, b: Int): Int {
return a + b
}
})
val summerActor = actor.spawn()
第二个参数为布尔值,控制 void
方法是否阻塞调用线程。
✅ 返回的 summerActor
同时实现了 Server
和 Summer
接口:
// 调用业务方法
val result = (summerActor as Summer).sum(1, 2)
// 调用管理方法
summerActor.shutdown()
底层仍基于 ServerActor
,但调用方式更符合 Java 开发习惯。
3.4 事件源(Event Sources)
适用于广播或多播场景:一个 Actor 接收消息,多个处理器并行响应。
创建 EventSourceActor
:
val actor = EventSourceActor<String>()
val eventSource = actor.spawn()
注册事件处理器(Lambda 形式):
eventSource.addHandler { msg ->
println(msg)
}
支持访问外部变量,但需注意跨纤程(Fiber)共享数据的线程安全问题:
val name = "Baeldung"
eventSource.addHandler { msg ->
println("$name $msg") // ⚠️ 共享变量需同步保护
}
所有处理器运行在同一 Fiber 上,因此不宜执行耗时操作。建议将重任务转发给专用 Worker Actor。
3.5 有限状态机(Finite-State Machine)
用 Actor 实现 FSM,每个状态表现为一个函数,消息处理决定状态迁移。
继承 FiniteStateMachineActor
:
@Suspendable
fun lockedState(): SuspendableCallable<SuspendableCallable<*>> {
return receive { msg ->
when (msg) {
"PUSH" -> {
println("Still locked")
lockedState()
}
"COIN" -> {
println("Unlocking...")
unlockedState()
}
else -> TERMINATE
}
}
}
定义初始状态:
@Suspendable
override fun initialState(): SuspendableCallable<SuspendableCallable<*>> {
return SuspendableCallable { lockedState() }
}
状态函数返回值含义:
- 新状态函数:切换状态
TERMINATE
:终止 Actornull
:不消费当前消息,留给下一状态处理
非常适合实现协议解析器、工作流引擎等状态驱动逻辑。
4. Reactive Streams 集成
Reactive Streams 是 JVM 上异步流处理的事实标准(如 RxJava、Akka Stream)。Quasar 提供桥接能力,实现 Channel 与 Publisher/Subscriber 之间的互操作。
4.1 添加依赖
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-reactive-streams</artifactId>
<version>0.8.0</version>
</dependency>
✅ 确保版本与其他 Quasar 模块一致。
⚠️ 若项目已引入 Reactive Streams API,需确认版本兼容性(如 quasar-reactive-streams:0.8.0
依赖 reactive-streams:1.0.2
)。
4.2 发布到 Reactive 流
将 Quasar Channel 转为 Reactive Publisher
:
val inputChannel = Channels.newChannel<String>(1)
val publisher = ReactiveStreams.toPublisher(inputChannel)
下游 Subscriber 可以标准方式消费此流。
⚠️ 默认 Publisher
仅支持单个订阅者。若需多播,请使用 Topic
:
val inputTopic = Topic<String>()
val publisher = ReactiveStreams.toPublisher(inputTopic) // 支持多订阅
4.3 订阅 Reactive 流
将外部 Publisher
转为 Quasar Channel:
val channel = ReactiveStreams.subscribe(10, Channels.OverflowPolicy.THROW, publisher)
返回 ReceivePort
,可像普通 Channel 一样使用 receive()
拉取消息。
此功能使得 Quasar Fiber 可无缝集成各类响应式库(如 Spring WebFlux、RSocket 等)。
5. 总结
本文介绍了 Quasar 在 Kotlin 中的高级用法,涵盖:
- ✅ Actor 模型的完整生命周期控制
- ✅ 邮箱管理、错误传播与注册发现
- ✅ 常见行为模板(Server、Proxy、FSM)
- ✅ 与 Reactive Streams 的双向集成
这些特性极大提升了异步系统的可组合性和可维护性。合理运用可显著降低高并发场景下的复杂度。
📌 示例代码详见 GitHub 仓库。