1. 简介
Quasar 是一个 Kotlin 库,它将一些异步编程概念以更易于管理的方式引入 Kotlin。其中包括轻量级线程(Fibers)、通道(Channels)、Actor 模型等。
Quasar 的核心思想是提供一种比传统线程更轻量的并发模型,从而在处理大量并发任务时获得更高的性能和更低的资源消耗。
2. 构建配置
使用最新版本的 Quasar 需要 JDK 11 或更高版本。如果你暂时无法升级到 JDK 11,Quasar 也支持 JDK 7。
Quasar 提供了四个主要依赖项,具体使用哪些依赖取决于你要使用的功能。建议在使用时保持版本一致:
- co.paralleluniverse:quasar-core – Quasar 的核心库
- co.paralleluniverse:quasar-kotlin – Kotlin 扩展支持
- co.paralleluniverse:quasar-actors – Actor 模型支持(后续文章会介绍)
- co.paralleluniverse:quasar-reactive-streams – Reactive Streams 支持(后续文章会介绍)
2.1. 命令行运行
使用 Quasar 时,需要通过 -javaagent
参数加载 Quasar 的 Java Agent 来进行字节码插桩:
$ java -javaagent:quasar-core.jar -cp quasar-core.jar:quasar-kotlin.jar:application.jar fully.qualified.main.Class
2.2. Maven 中运行
也可以通过 Maven 插件来自动加载 Java Agent:
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>
然后使用 Exec 插件运行程序:
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.3.2</version>
<configuration>
<workingDirectory>target/classes</workingDirectory>
<executable>echo</executable>
<arguments>
<argument>-javaagent:${co.paralleluniverse:quasar-core:jar}</argument>
<argument>-classpath</argument> <classpath/>
<argument>com.baeldung.quasar.QuasarHelloWorldKt</argument>
</arguments>
</configuration>
</plugin>
运行命令:
mvn compile dependency:properties exec:exec
2.3. 单元测试运行
使用 Surefire 插件运行单元测试时也可加载 Java Agent:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
<configuration>
<argLine>-javaagent:${co.paralleluniverse:quasar-core:jar}</argLine>
</configuration>
</plugin>
3. Fibers(轻量线程)
Fibers 是 Quasar 的核心功能,它类似于线程,但更轻量。每个 Fiber 所占用的内存和 CPU 时间远低于标准线程。
Fibers 并不是为了完全替代线程,而是更适合于大量阻塞操作的场景,比如等待数据库查询结果、等待网络响应等。
3.1. 启动 Fiber
启动 Fiber 的方式与启动线程类似:
class MyRunnable : SuspendableRunnable {
override fun run() {
println("Inside Fiber")
}
}
Fiber<Void>(MyRunnable()).start()
也可以使用 Lambda:
val fiber = Fiber<Void> {
println("Inside Fiber Lambda")
}
fiber.start()
或者使用 DSL:
fiber @Suspendable {
println("Inside Fiber DSL")
}
DSL 是最推荐的方式,简洁且直观。
3.2. 返回值
使用 SuspendableCallable
可以让 Fiber 返回值:
class MyCallable : SuspendableCallable<String> {
override fun run(): String {
println("Inside Fiber")
return "Hello"
}
}
Fiber<String>(MyCallable()).start()
fiber @Suspendable {
println("Inside Fiber DSL")
"Hello"
}
获取返回值使用 get()
方法:
val pi = fiber @Suspendable {
computePi()
}.get()
⚠️ get()
会阻塞当前 Fiber 直到返回结果。
3.3. 等待 Fiber 完成
可以使用 join()
方法等待 Fiber 完成:
val fiber = Fiber<Void>(Runnable()).start()
fiber.join()
也可以设置超时时间:
fiber @Suspendable {
TimeUnit.SECONDS.sleep(5)
}.join(2, TimeUnit.SECONDS)
超时会抛出 TimeoutException
。
3.4. 调度器
Fibers 是通过 FiberScheduler
调度执行的,默认使用 FiberForkJoinScheduler
,它会为每个 CPU 核心创建一个线程。
可以通过系统属性配置调度器:
co.paralleluniverse.fibers.DefaultFiberPool.parallelism
– 线程数co.paralleluniverse.fibers.DefaultFiberPool.exceptionHandler
– 异常处理器co.paralleluniverse.fibers.DefaultFiberPool.monitor
– 监控方式co.paralleluniverse.fibers.DefaultFiberPool.detailedFiberInfo
– 是否启用详细信息监控
3.5. 可挂起方法
Quasar 的核心机制是基于可挂起方法(Suspendable Methods)。这些方法可以被 Fiber 挂起并恢复执行。
可挂起方法的定义方式:
- 抛出
SuspendException
- 使用
@Suspendable
注解 - Java 8 lambda 方法(自动识别)
- 反射调用的方法
⚠️ 不能使用 synchronized
块或构造函数作为可挂起方法。
⚠️ 不建议在可挂起方法中直接使用 Thread.sleep()
,否则会导致性能下降甚至系统不稳定。
4. Strands(线程与 Fiber 的统一抽象)
Strands 是 Quasar 中用于统一表示线程和 Fiber 的抽象概念。
可以使用 Strand.of()
获取线程或 Fiber 的 Strands 表示:
val thread: Thread = ...
val strandThread = Strand.of(thread)
val fiber: Fiber = ...
val strandFiber = Strand.of(fiber)
也可以使用 Strand.currentStrand()
获取当前执行的 Strand:
val myFiber = fiber @Suspendable {
// Strand.of(myFiber) == Strand.currentStrand()
}
统一 API 支持:
strand.id
strand.name
strand.priority
strand.isAlive
strand.isFiber
strand.join()
strand.get()
5. 包装回调函数
Fibers 的一个重要用途是封装基于回调的异步代码。
使用 FiberAsync<T, E>
可以将回调封装为同步 API:
interface PiCallback {
fun success(result: BigDecimal)
fun failure(error: Exception)
}
class PiAsync : PiCallback, FiberAsync<BigDecimal, Exception>() {
override fun success(result: BigDecimal) {
asyncCompleted(result)
}
override fun failure(error: Exception) {
asyncFailed(error)
}
override fun requestAsync() {
computePi(this)
}
}
调用方式:
val result = PiAsync().run()
⚠️ 必须在 Fiber 中使用,不能在普通线程中调用。
6. 通道(Channels)
Channels 是 Quasar 提供的用于在不同 Strands 之间传递消息的机制,类似于 Go 的 channel。
6.1. 创建通道
使用 Channels.newChannel()
创建通道:
Channels.newChannel<String>(1024, Channels.OverflowPolicy.BLOCK, true, true)
也可以使用原始类型通道:
newIntChannel
newLongChannel
newFloatChannel
newDoubleChannel
6.2. 使用通道
使用 SendPort
发送消息:
channel.send("Hello")
channel.send("World")
使用 ReceivePort
接收消息:
fiber @Suspendable {
while (true) {
val message = channel.receive()
println("Received: $message")
}
}
6.3. 关闭通道
可以使用 close()
和 isClosed
控制通道状态:
fiber @Suspendable {
while (!channel.isClosed) {
val message = channel.receive()
println("Received: $message")
}
}
channel.send("Hello")
channel.send("World")
channel.close()
6.4. 阻塞式通道
Channels 是阻塞式的,当接收方没有消息时会挂起 Fiber,而不是忙等。
6.5. 多通道选择器
使用 Selector.select()
可以在多个通道中选择一个进行操作:
fiber @Suspendable {
while (!channel1.isClosed && !channel2.isClosed) {
val received = Selector.select(
Selector.receive(channel1),
Selector.receive(channel2)
)
println("Received: $received")
}
}
也可以用于发送:
fiber @Suspendable {
for (i in 0..10) {
Selector.select(
Selector.send(channel1, "Channel 1: $i"),
Selector.send(channel2, "Channel 2: $i")
)
}
}
6.6. Ticker 通道
Ticker 通道适用于只关心最新消息的场景(如股票行情):
val channel = Channels.newChannel<String>(3, Channels.OverflowPolicy.DISPLACE)
可以使用 TickerChannelConsumer
在多个 Strands 中消费:
for (i in 0..10) {
val tickerConsumer = Channels.newTickerConsumerFor(channel)
fiber @Suspendable {
while (!tickerConsumer.isClosed) {
val message = tickerConsumer.receive()
println("Received on $i: $message")
}
}
}
6.7. 函数式转换
支持对通道进行函数式转换:
map
filter
flatMap
reduce
例如将字符串反转:
val transformOnReceive = Channels.map(channel, Function<String, String> { msg: String? -> msg?.reversed() })
7. 数据流(Data Flow)
Quasar 提供了两个用于响应式编程的核心类:Val
和 Var
。
Val
表示常量Var
表示变量
示例:
val a = Var<Int>()
val b = Val<Int>()
val c = Var<Int> { a.get() + b.get() }
val d = Var<Int> { a.get() * b.get() }
val initialResult = Val<Int> { d.get() - c.get() }
val currentResult = Var<Int> { d.get() - c.get() }
当 a
或 b
变化时,currentResult
会自动更新,但 initialResult
不会。
8. 总结
本文介绍了 Quasar 的基本使用,包括 Fibers、Channels、Strands、回调封装等核心功能。Quasar 提供了一种轻量、高效的异步编程模型,非常适合高并发场景下的 Kotlin 应用开发。
完整的示例代码可以参考 GitHub 上的 Kotlin 教程。