1. 简介

我们之前已经了解过 Quasar,它为 Kotlin 提供了更高效、更简洁的异步编程能力。通过轻量级线程(Fiber)和消息传递机制,我们可以摆脱传统回调地狱或复杂 Future 组合的束缚。

本文将深入探讨 Quasar 的一些进阶特性,帮助你构建更健壮、可维护性更高的并发系统。

⚠️ 注意:以下内容面向已有一定并发编程经验的开发者,基础概念不再赘述。


2. Actor 模型

Actor 模型是一种经典的并发编程范式,在 Erlang 中尤为流行。Quasar 支持 Actor 编程模型,允许我们将应用拆分为多个独立通信的 Actor 实例

每个 Actor 具备以下核心能力:

  • ✅ 创建其他 Actor
  • ✅ 向其他 Actor 发送消息
  • ✅ 接收并处理来自其他 Actor 的消息

这三者构成了基于 Actor 构建系统的基石。

在 Quasar 中,一个 Actor 本质上是一个 Strand(通常是 Fiber,也可使用 Thread),并配备了一个用于接收消息的 Channel,同时内置了生命周期管理和错误处理支持

2.1 添加 Actor 依赖

Actor 并非 Quasar 核心模块,需额外引入依赖:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-actors</artifactId>
    <version>0.8.0</version>
</dependency>

务必确保该版本与其他 Quasar 相关依赖保持一致,避免运行时兼容性问题。


2.2 创建 Actor

通过继承 Actor 类来定义 Actor,需指定名称和邮箱配置(MailboxConfig),并实现 doRun() 方法:

val actor = object : Actor<Int, String>("noopActor", MailboxConfig(5, Channels.OverflowPolicy.THROW)) {
    @Suspendable
    override fun doRun(): String {
        return "Hello"
    }
}
  • 名称和 MailboxConfig 均为可选参数。
  • 若未指定邮箱配置,默认使用无界邮箱(unbounded mailbox)。

⚠️ 关键点:必须手动为 Actor 中的方法添加 @Suspendable 注解
由于 Kotlin 不强制声明异常,编译器不会自动识别父类中抛出的 SuspendException,因此需要显式标注以告知 Quasar 运行时此方法可能挂起。

启动 Actor 使用 spawn()(启动 Fiber)或 spawnThread()(启动真实线程):

actor.spawn()

println("Noop Actor: ${actor.get()}")

启动后可通过 join() 阻塞等待执行完成,或通过 get() 获取返回值。


2.3 向 Actor 发送消息

调用 spawn()spawnThread() 会返回一个 ActorRef 对象——这是与 Actor 交互的主要句柄。

ActorRef 实现了 SendPort 接口,因此可像操作 Channel 的生产端一样发送消息:

val actorRef = actor.spawn()

actorRef.send(1)

2.4 接收消息

doRun() 方法内部调用 receive() 可同步获取下一条待处理消息:

val actor = object : Actor<Int, Void?>("simpleActor", null) {
    @Suspendable
    override fun doRun(): Void? {
        val msg = receive()
        println("SimpleActor Received Message: $msg")
        return null
    }
}

receive() 是阻塞调用,直到有消息到达才会继续执行。

实际开发中,多数 Actor 需持续处理多条消息,通常采用无限循环模式:

val actor = object : Actor<Int, Void?>("loopingActor", null) {
    @Suspendable
    override fun doRun(): Void? {
        while (true) {
            val msg = receive()

            if (msg > 0) {
                println("LoopingActor Received Message: $msg")
            } else {
                break
            }
        }

        return null
    }
}

上述代码将持续处理消息,直至收到 0 才退出。


2.5 消息发送过快:邮箱溢出问题

当消息发送速度远高于 Actor 处理速度时,会导致邮箱堆积甚至溢出。

默认邮箱容量无限,但可通过 MailboxConfig 显式限制大小:

Actor<Int, String>("backlogActor", 
    MailboxConfig(1, Channels.OverflowPolicy.THROW))

⚠️ 当前 Quasar 版本中,无论设置何种溢出策略,实际行为均为 THROW——即一旦邮箱满,后续 send() 调用将直接抛出异常。

更严重的是,该异常无法在 Actor 内部被捕获

try {
    receive()
} catch (e: Throwable) {
    // ❌ 永远不会执行到这里
}

此时,Actor 会直接终止。外部调用 get() 将抛出 ExecutionException,其原因链中包含 QueueCapacityExceededException,指向导致溢出的具体 send() 调用位置。

规避方案:使用 trySend()

若已知目标 Actor 邮箱有限,建议改用 trySend(),它不会引发崩溃,而是返回布尔值表示是否成功入队:

val actor = object : Actor<Int, String>("backlogTrySendActor", 
  MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
    @Suspendable
    override fun doRun(): String {
        TimeUnit.MILLISECONDS.sleep(500);
        println("Backlog TrySend Actor Received: ${receive()}")

        return "No Exception"
    }
}

val actorRef = actor.spawn()

actorRef.trySend(1) // 返回 true
actorRef.trySend(2) // 返回 false

2.6 消息读取过快:非阻塞接收

相反情况是 Actor 主动轮询消息但来源不足。默认 receive() 会永久阻塞,但在某些场景下我们需要更灵活的控制。

Quasar 提供三种接收策略:

方式 方法 行为
永久阻塞 receive() 直到消息到达
超时阻塞 receive(timeout, unit) 超时后返回 null
非阻塞 tryReceive() 立即返回消息或 null

示例:带心跳输出的超时接收

while(true) {
    val msg = receive(1, TimeUnit.SECONDS)
    if (msg != null) {
        // 处理消息
    } else {
        println("Still alive")
    }
}

示例:完全非阻塞轮询

while(true) {
    val msg = tryReceive()
    if (msg != null) {
        // 处理消息
    } else {
        print(".")
    }
}

适用于高频率状态检查或 UI 刷新等场景。


2.7 消息过滤与转换

默认情况下,所有发往 Actor 的消息都会被接收。但我们可以通过重写 filterMessage() 方法实现前置过滤或转换:

override fun filterMessage(m: Any?): Int? {
    return when (m) {
        is Int -> {
            if (m % 2 == 0) {
                m
            } else {
                null // 拒绝奇数
            }
        } else -> super.filterMessage(m)
    }
}

filterMessage() 不仅能过滤,还能转换消息内容。返回值即为最终传给 receive() 的对象,兼具 filtermap 功能。

例如:过滤奇数,并将偶数乘以 10:

override fun filterMessage(m: Any?): Int? {
    return when (m) {
        is Int -> {
            if (m % 2 == 0) {
                m * 10 // 转换
            } else {
                null
            }
        }
        else -> super.filterMessage(m)
    }
}

⚠️ 返回类型必须与 Actor 定义的消息类型匹配。


2.8 Actor 链接与错误传播

目前 Actor 默认独立运行。Quasar 支持通过链接机制实现故障通知,主要分为两种模式:

单向监听:watch()

val watcherRef = watcher.spawn()
val watchedRef = watched.spawn()
watcher.watch(watchedRef)

watcher 会收到 watched 的生命周期事件(如退出),而反向不成立。

val firstRef = first.spawn()
val secondRef = second.spawn()
first.link(secondRef) // 双向绑定

两个 Actor 互为监听方。

生命周期事件处理

被监控方的生命周期事件会封装为 LifecycleMessage 放入监控方的邮箱队列,由 filterMessage() 处理后交由 handleLifecycleMessage()

override fun handleLifecycleMessage(m: LifecycleMessage?): Int? {
    println("WatcherActor Received Lifecycle Message: ${m}")
    return super.handleLifecycleMessage(m)
}

⚠️ 关键区别:

  • watch():默认 handleLifecycleMessage() 仅清理引用,不会中断当前执行流
  • link():除清理外,还会抛出异常,导致 doRun() 中的 receive() 调用失败

这意味着使用 link() 时,任意关联 Actor 崩溃都会直接中断当前 Actor,适合“一荣俱荣,一损俱损”的强一致性场景;而 watch() 更适合松耦合的监控与恢复逻辑。


2.9 Actor 注册与查找

早期例子中我们通过局部变量持有 ActorRef,但跨模块调用时难以传递引用。

Quasar 提供全局注册中心 ActorRegistry

val actorRef = actor.spawn()
actor.register() // 使用构造时的名字注册

// 其他任意位置查找
val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("theActorName")
assertEquals(actorRef, retrievedRef)

若创建时未命名,可在注册时指定:

actor.register("renamedActor")

⚠️ 注意:getActor() 默认会阻塞等待目标 Actor 出现!
为防止永久挂起,应使用带超时的版本:

val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("unknownActor", 1, TimeUnit.SECONDS)
Assert.assertNull(retrievedRef) // 超时返回 null

适用于动态服务发现、延迟初始化等场景。


3. Actor 模板(Behaviors)

除了从零构建 Actor,Quasar 还封装了常见模式作为模板(类似 Erlang 中的 Behaviors),提升开发效率。

这些模板通常表现为 ActorActorRef 的子类,提供更高层次的抽象 API。

3.1 请求-回复模式(Request/Reply)

典型场景:客户端发送请求,Actor 处理后返回结果。

实现步骤:

  1. 定义消息类继承 RequestMessage

    data class TestMessage(val input: Int) : RequestMessage<Int>()
    
  2. 客户端使用 RequestReplyHelper.call() 发起调用

    val result = RequestReplyHelper.call(actorRef, TestMessage(50))
    
  3. Actor 内部处理并回复

    val actor = object : Actor<TestMessage, Void?>() {
     @Suspendable
     override fun doRun(): Void {
         while (true) {
             val msg = receive()
             RequestReplyHelper.reply(msg, msg.input * 100)
         }
         return null
     }
    }
    

✅ 自动处理响应路由,无需手动管理回调上下文。


3.2 Server Actor

对 Request/Reply 的进一步封装,Actor 自身具备同步/异步调用能力。

通过继承 ServerActor 并传入 ServerHandler 实现:

val actor = ServerActor(object : AbstractServerHandler<Int, String, Float>() {
    @Suspendable
    override fun handleCall(from: ActorRef<*>?, id: Any?, m: Int?): String {
        println("Called with message: $m from $from")
        return m?.toString() ?: "None"
    }

    @Suspendable
    override fun handleCast(from: ActorRef<*>?, id: Any?, m: Float?) {
        println("Cast message: $m from $from")
    }
})

方法说明:

方法 用途
init / terminate 生命周期钩子
handleCall 同步请求处理,需返回结果
handleCast 异步通知,无需回复
handleInfo 处理普通消息(非 call/cast)
handleTimeout 空闲超时处理

启动后返回 Server 类型引用,支持 call()cast()shutdown()

val server = actor.spawn()

val result = server.call(5)
server.cast(2.5f)
server.shutdown()

3.3 代理服务器(Proxy Server)

ServerActor 的语法糖,利用 Java 动态代理将接口调用自动转发至 Actor。

定义标准 Java 接口:

@Suspendable
interface Summer {
    fun sum(a: Int, b: Int): Int
}

创建 ProxyServerActor

val actor = ProxyServerActor(false, object : Summer {
    override fun sum(a: Int, b: Int): Int {
        return a + b
    }
})

val summerActor = actor.spawn()

第二个参数为布尔值,控制 void 方法是否阻塞调用线程。

✅ 返回的 summerActor 同时实现了 ServerSummer 接口:

// 调用业务方法
val result = (summerActor as Summer).sum(1, 2)

// 调用管理方法
summerActor.shutdown()

底层仍基于 ServerActor,但调用方式更符合 Java 开发习惯。


3.4 事件源(Event Sources)

适用于广播或多播场景:一个 Actor 接收消息,多个处理器并行响应。

创建 EventSourceActor

val actor = EventSourceActor<String>()
val eventSource = actor.spawn()

注册事件处理器(Lambda 形式):

eventSource.addHandler { msg ->
    println(msg)
}

支持访问外部变量,但需注意跨纤程(Fiber)共享数据的线程安全问题:

val name = "Baeldung"
eventSource.addHandler { msg ->
    println("$name $msg") // ⚠️ 共享变量需同步保护
}

所有处理器运行在同一 Fiber 上,因此不宜执行耗时操作。建议将重任务转发给专用 Worker Actor。


3.5 有限状态机(Finite-State Machine)

用 Actor 实现 FSM,每个状态表现为一个函数,消息处理决定状态迁移。

继承 FiniteStateMachineActor

@Suspendable
fun lockedState(): SuspendableCallable<SuspendableCallable<*>> {
    return receive { msg ->
        when (msg) {
            "PUSH" -> {
                println("Still locked")
                lockedState()
            }
            "COIN" -> {
                println("Unlocking...")
                unlockedState()
            }
            else -> TERMINATE
        }
    }
}

定义初始状态:

@Suspendable
override fun initialState(): SuspendableCallable<SuspendableCallable<*>> {
    return SuspendableCallable { lockedState() }
}

状态函数返回值含义:

  • 新状态函数:切换状态
  • TERMINATE:终止 Actor
  • null:不消费当前消息,留给下一状态处理

非常适合实现协议解析器、工作流引擎等状态驱动逻辑。


4. Reactive Streams 集成

Reactive Streams 是 JVM 上异步流处理的事实标准(如 RxJava、Akka Stream)。Quasar 提供桥接能力,实现 Channel 与 Publisher/Subscriber 之间的互操作。

4.1 添加依赖

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-reactive-streams</artifactId>
    <version>0.8.0</version>
</dependency>

✅ 确保版本与其他 Quasar 模块一致。
⚠️ 若项目已引入 Reactive Streams API,需确认版本兼容性(如 quasar-reactive-streams:0.8.0 依赖 reactive-streams:1.0.2)。


4.2 发布到 Reactive 流

将 Quasar Channel 转为 Reactive Publisher

val inputChannel = Channels.newChannel<String>(1)
val publisher = ReactiveStreams.toPublisher(inputChannel)

下游 Subscriber 可以标准方式消费此流。

⚠️ 默认 Publisher 仅支持单个订阅者。若需多播,请使用 Topic

val inputTopic = Topic<String>()
val publisher = ReactiveStreams.toPublisher(inputTopic) // 支持多订阅

4.3 订阅 Reactive 流

将外部 Publisher 转为 Quasar Channel:

val channel = ReactiveStreams.subscribe(10, Channels.OverflowPolicy.THROW, publisher)

返回 ReceivePort,可像普通 Channel 一样使用 receive() 拉取消息。

此功能使得 Quasar Fiber 可无缝集成各类响应式库(如 Spring WebFlux、RSocket 等)。


5. 总结

本文介绍了 Quasar 在 Kotlin 中的高级用法,涵盖:

  • ✅ Actor 模型的完整生命周期控制
  • ✅ 邮箱管理、错误传播与注册发现
  • ✅ 常见行为模板(Server、Proxy、FSM)
  • ✅ 与 Reactive Streams 的双向集成

这些特性极大提升了异步系统的可组合性和可维护性。合理运用可显著降低高并发场景下的复杂度。

📌 示例代码详见 GitHub 仓库


原始标题:Advanced Quasar Usage for Kotlin