1. 简介

Quasar 是一个 Kotlin 库,它将一些异步编程概念以更易于管理的方式引入 Kotlin。其中包括轻量级线程(Fibers)、通道(Channels)、Actor 模型等。

Quasar 的核心思想是提供一种比传统线程更轻量的并发模型,从而在处理大量并发任务时获得更高的性能和更低的资源消耗。


2. 构建配置

使用最新版本的 Quasar 需要 JDK 11 或更高版本。如果你暂时无法升级到 JDK 11,Quasar 也支持 JDK 7。

Quasar 提供了四个主要依赖项,具体使用哪些依赖取决于你要使用的功能。建议在使用时保持版本一致:

2.1. 命令行运行

使用 Quasar 时,需要通过 -javaagent 参数加载 Quasar 的 Java Agent 来进行字节码插桩:

$ java -javaagent:quasar-core.jar -cp quasar-core.jar:quasar-kotlin.jar:application.jar fully.qualified.main.Class

2.2. Maven 中运行

也可以通过 Maven 插件来自动加载 Java Agent:

<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <execution>
            <id>getClasspathFilenames</id>
            <goals>
                <goal>properties</goal>
            </goals>
        </execution>
    </executions>
</plugin>

然后使用 Exec 插件运行程序:

<plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>exec-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
        <workingDirectory>target/classes</workingDirectory>
        <executable>echo</executable>
        <arguments>
            <argument>-javaagent:${co.paralleluniverse:quasar-core:jar}</argument>
            <argument>-classpath</argument> <classpath/>
            <argument>com.baeldung.quasar.QuasarHelloWorldKt</argument>
        </arguments>
    </configuration>
</plugin>

运行命令:

mvn compile dependency:properties exec:exec

2.3. 单元测试运行

使用 Surefire 插件运行单元测试时也可加载 Java Agent:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <version>2.22.1</version>
    <configuration>
        <argLine>-javaagent:${co.paralleluniverse:quasar-core:jar}</argLine>
    </configuration>
</plugin>

3. Fibers(轻量线程)

Fibers 是 Quasar 的核心功能,它类似于线程,但更轻量。每个 Fiber 所占用的内存和 CPU 时间远低于标准线程。

Fibers 并不是为了完全替代线程,而是更适合于大量阻塞操作的场景,比如等待数据库查询结果、等待网络响应等。

3.1. 启动 Fiber

启动 Fiber 的方式与启动线程类似:

class MyRunnable : SuspendableRunnable {
    override fun run() {
        println("Inside Fiber")
    }
}
Fiber<Void>(MyRunnable()).start()

也可以使用 Lambda:

val fiber = Fiber<Void> {
    println("Inside Fiber Lambda")
}
fiber.start()

或者使用 DSL:

fiber @Suspendable {
    println("Inside Fiber DSL")
}

DSL 是最推荐的方式,简洁且直观。

3.2. 返回值

使用 SuspendableCallable 可以让 Fiber 返回值:

class MyCallable : SuspendableCallable<String> {
    override fun run(): String {
        println("Inside Fiber")
        return "Hello"
    }
}
Fiber<String>(MyCallable()).start()

fiber @Suspendable {
    println("Inside Fiber DSL")
    "Hello"
}

获取返回值使用 get() 方法:

val pi = fiber @Suspendable {
    computePi()
}.get()

⚠️ get() 会阻塞当前 Fiber 直到返回结果。

3.3. 等待 Fiber 完成

可以使用 join() 方法等待 Fiber 完成:

val fiber = Fiber<Void>(Runnable()).start()
fiber.join()

也可以设置超时时间:

fiber @Suspendable {
    TimeUnit.SECONDS.sleep(5)
}.join(2, TimeUnit.SECONDS)

超时会抛出 TimeoutException

3.4. 调度器

Fibers 是通过 FiberScheduler 调度执行的,默认使用 FiberForkJoinScheduler,它会为每个 CPU 核心创建一个线程。

可以通过系统属性配置调度器:

  • co.paralleluniverse.fibers.DefaultFiberPool.parallelism – 线程数
  • co.paralleluniverse.fibers.DefaultFiberPool.exceptionHandler – 异常处理器
  • co.paralleluniverse.fibers.DefaultFiberPool.monitor – 监控方式
  • co.paralleluniverse.fibers.DefaultFiberPool.detailedFiberInfo – 是否启用详细信息监控

3.5. 可挂起方法

Quasar 的核心机制是基于可挂起方法(Suspendable Methods)。这些方法可以被 Fiber 挂起并恢复执行。

可挂起方法的定义方式:

  • 抛出 SuspendException
  • 使用 @Suspendable 注解
  • Java 8 lambda 方法(自动识别)
  • 反射调用的方法

⚠️ 不能使用 synchronized 块或构造函数作为可挂起方法。
⚠️ 不建议在可挂起方法中直接使用 Thread.sleep(),否则会导致性能下降甚至系统不稳定。


4. Strands(线程与 Fiber 的统一抽象)

Strands 是 Quasar 中用于统一表示线程和 Fiber 的抽象概念

可以使用 Strand.of() 获取线程或 Fiber 的 Strands 表示:

val thread: Thread = ...
val strandThread = Strand.of(thread)

val fiber: Fiber = ...
val strandFiber = Strand.of(fiber)

也可以使用 Strand.currentStrand() 获取当前执行的 Strand:

val myFiber = fiber @Suspendable {
    // Strand.of(myFiber) == Strand.currentStrand()
}

统一 API 支持:

strand.id
strand.name
strand.priority
strand.isAlive
strand.isFiber
strand.join()
strand.get()

5. 包装回调函数

Fibers 的一个重要用途是封装基于回调的异步代码

使用 FiberAsync<T, E> 可以将回调封装为同步 API:

interface PiCallback {
    fun success(result: BigDecimal)
    fun failure(error: Exception)
}

class PiAsync : PiCallback, FiberAsync<BigDecimal, Exception>() {
    override fun success(result: BigDecimal) {
        asyncCompleted(result)
    }

    override fun failure(error: Exception) {
        asyncFailed(error)
    }

    override fun requestAsync() {
        computePi(this)
    }
}

调用方式:

val result = PiAsync().run()

⚠️ 必须在 Fiber 中使用,不能在普通线程中调用。


6. 通道(Channels)

Channels 是 Quasar 提供的用于在不同 Strands 之间传递消息的机制,类似于 Go 的 channel。

6.1. 创建通道

使用 Channels.newChannel() 创建通道:

Channels.newChannel<String>(1024, Channels.OverflowPolicy.BLOCK, true, true)

也可以使用原始类型通道:

  • newIntChannel
  • newLongChannel
  • newFloatChannel
  • newDoubleChannel

6.2. 使用通道

使用 SendPort 发送消息:

channel.send("Hello")
channel.send("World")

使用 ReceivePort 接收消息:

fiber @Suspendable {
    while (true) {
        val message = channel.receive()
        println("Received: $message")
    }
}

6.3. 关闭通道

可以使用 close()isClosed 控制通道状态:

fiber @Suspendable {
    while (!channel.isClosed) {
        val message = channel.receive()
        println("Received: $message")
    }
}

channel.send("Hello")
channel.send("World")

channel.close()

6.4. 阻塞式通道

Channels 是阻塞式的,当接收方没有消息时会挂起 Fiber,而不是忙等。

6.5. 多通道选择器

使用 Selector.select() 可以在多个通道中选择一个进行操作:

fiber @Suspendable {
    while (!channel1.isClosed && !channel2.isClosed) {
        val received = Selector.select(
            Selector.receive(channel1),
            Selector.receive(channel2)
        )

        println("Received: $received")
    }
}

也可以用于发送:

fiber @Suspendable {
    for (i in 0..10) {
        Selector.select(
            Selector.send(channel1, "Channel 1: $i"),
            Selector.send(channel2, "Channel 2: $i")
        )
    }
}

6.6. Ticker 通道

Ticker 通道适用于只关心最新消息的场景(如股票行情):

val channel = Channels.newChannel<String>(3, Channels.OverflowPolicy.DISPLACE)

可以使用 TickerChannelConsumer 在多个 Strands 中消费:

for (i in 0..10) {
    val tickerConsumer = Channels.newTickerConsumerFor(channel)
    fiber @Suspendable {
        while (!tickerConsumer.isClosed) {
            val message = tickerConsumer.receive()
            println("Received on $i: $message")
        }
    }
}

6.7. 函数式转换

支持对通道进行函数式转换:

  • map
  • filter
  • flatMap
  • reduce

例如将字符串反转:

val transformOnReceive = Channels.map(channel, Function<String, String> { msg: String? -> msg?.reversed() })

7. 数据流(Data Flow)

Quasar 提供了两个用于响应式编程的核心类:ValVar

  • Val 表示常量
  • Var 表示变量

示例:

val a = Var<Int>()
val b = Val<Int>()

val c = Var<Int> { a.get() + b.get() }
val d = Var<Int> { a.get() * b.get() }

val initialResult = Val<Int> { d.get() - c.get() }
val currentResult = Var<Int> { d.get() - c.get() }

ab 变化时,currentResult 会自动更新,但 initialResult 不会。


8. 总结

本文介绍了 Quasar 的基本使用,包括 Fibers、Channels、Strands、回调封装等核心功能。Quasar 提供了一种轻量、高效的异步编程模型,非常适合高并发场景下的 Kotlin 应用开发。

完整的示例代码可以参考 GitHub 上的 Kotlin 教程


原始标题:Introduction to Quasar in Kotlin