1. 概述
Actor 模型是一种非常有前景的并发编程模型,而 Akka 是其在 JVM 平台上的代表性实现。
本文将介绍 Actor 模型的核心特性,并深入讲解 Akka 的最新版本——Akka Typed 是如何实现这些特性的。
2. 场景说明
为了更好地理解 Actor 模型,我们通过一个具体的例子来贯穿全文:构建一个股票组合系统,支持买入和卖出操作。
Stock 类表示一个股票及其持有数量:
case class Stock(name: String, owned: Long) {
def buy(qty: Long): Stock = copy(name, owned + qty)
def sell(qty: Long): Stock =
if (qty <= owned)
copy(name, owned - qty)
else
this
}
Portfolio 类封装了一个 Stock 的 Map:
case class Portfolio(stocks: Map[String, Stock]) {
def buy(name: String, qty: Long): Portfolio = {
// 添加股票逻辑
}
def sell(name: String, qty: Long): Portfolio = {
// 卖出股票逻辑
}
}
这个系统的客户端可以是一些社交网络分析机器人,它们根据舆情决定是否买卖股票。
3. 项目依赖配置
我们使用 SBT 构建项目。要使用 Akka Typed,需要添加以下依赖:
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0"
若要测试 Akka Typed 的 Actor,还需添加测试工具包:
libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.0" % Test
4. 主角登场:Actor
Actor 模型的核心是 Actor,它是一个独立的计算单元,通过内置的消息机制与其他 Actor 交互。
✅ Actor 是一种响应式对象,它通过消息驱动执行计算逻辑。
每个 Actor 都是独立的,它们之间不共享状态,唯一的通信方式就是发送消息(有时等待响应)。
在一个 Actor 系统中,Actor 只能做三件事:
- 向自己或其他 Actor 发送消息
- 创建新的 Actor
- 指定新的行为(behavior)
Akka Typed 将 Actor 定义为行为的工厂。
定义 Actor 有两种方式:面向对象或函数式。本文采用后者。
示例:银行管理股票组合
object Bank {
final case class CreatePortfolio(client: ActorRef[PortfolioCreated])
final case class PortfolioCreated(portfolio: ActorRef[PortfolioCommand])
def apply(): Behavior[CreatePortfolio] =
Behaviors.receive { (context, message) =>
val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
message.client ! PortfolioCreated(replyTo)
Behaviors.same
}
}
这段代码定义了我们的第一个 Typed Actor —— Bank
。
apply
方法定义了 Actor 的行为,也就是它能处理的消息类型。这里它只处理 CreatePortfolio
消息。
Behaviors
是创建行为的工厂类。
⚠️ 每个 Actor 都是并发执行的,但其内部状态是私有的,不会引发竞态条件。
每个 Actor 都有一个关联的 邮箱(mailbox),用于缓存待处理的消息。默认邮箱是无界的,更多信息请参考 Akka Mailboxes 文档。
5. Actor 如何通信:消息机制
消息是 Actor 之间通信的唯一方式。每个 Actor 都定义了自己的通信协议,也就是它能处理的消息集合。
为了保证线程安全,建议将消息定义为不可变类。如果一个 Actor 能处理多种消息,建议使用 sealed trait
作为基类。
Bank Actor 的协议示例:
final case class CreatePortfolio(client: ActorRef[PortfolioCreated])
final case class PortfolioCreated(portfolio: ActorRef[PortfolioCommand])
Actor 能处理的消息类型由其 Behavior[T]
类型参数决定。
Actor 引用:ActorRef
Actor 之间通信需要持有对方的引用,Akka 使用 ActorRef[-T]
来表示。其中 T
表示该 Actor 能处理的消息类型。
如果要实现请求-响应模式,需要在消息中包含回复地址。
例如,客户端请求创建 Portfolio:
Behaviors.receive { (context, message) =>
bank ! CreatePortfolio(context.self)
// 更多逻辑
}
通信模式
5.1. Tell 模式(发送消息)
Tell 模式是最基本的通信方式,完全异步。
val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
message.client ! PortfolioCreated(replyTo)
发送后不关心是否成功,适用于 fire-and-forget 场景。
5.2. Ask 模式(请求响应)
Ask 模式更适合需要等待响应的场景,适用于 1:1 请求响应。
object BankClient {
def apply(bank: ActorRef[CreatePortfolio]): Behavior[Unit] =
Behaviors.setup { context =>
implicit val timeout: Timeout = 3.seconds
context.ask(bank, CreatePortfolio) {
case Success(message) =>
context.log.info("Portfolio received")
message.portfolio ! Buy("APPL", 100L)
case Failure(_) => context.log.info("Portfolio received")
}
Behaviors.ignore[Unit]
}
}
使用 Ask 模式需要注意:
- 需要设置超时时间,避免阻塞
- 使用
context.ask
发送消息 - 提供成功/失败的回调处理逻辑
6. 扩展通信:Actor 创建
Akka Typed 使用工厂方法创建 Actor 实例,这些方法定义在 Behaviors
对象中。
最常用的是 Behaviors.receive
:
Behaviors.receive { (context, message) =>
val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
message.client ! PortfolioCreated(replyTo)
Behaviors.same
}
该方法接收上下文和消息,并返回新的行为。如果不需要改变行为,可以使用 Behaviors.same
。
如果不需要上下文,可以使用 Behaviors.receiveMessage
:
Behaviors.receiveMessage { message =>
message match {
case Buy(stock, qty) =>
portfolio(stocks.buy(stock, qty))
case Sell(stock, qty) =>
portfolio(stocks.sell(stock, qty))
}
}
如果需要在 Actor 启动时执行逻辑,可以使用 Behaviors.setup
:
object BankClientUsingTheTellPattern {
def apply(bank: ActorRef[CreatePortfolio]): Behavior[PortfolioCreated] =
Behaviors.setup { context =>
bank ! CreatePortfolio(context.self)
Behaviors.receiveMessage {
case PortfolioCreated(portfolio) =>
// 处理新创建的 Portfolio
}
}
}
7. Actor 行为与状态管理
Akka Typed 之所以叫“Typed”,是因为它通过 Behavior[-T]
类型强制编译器检查消息类型的合法性。
例如,PortfolioActor 只能处理 Buy
和 Sell
消息:
sealed trait PortfolioCommand
final case class Buy(stock: String, quantity: Long) extends PortfolioCommand
final case class Sell(stock: String, quantity: Long) extends PortfolioCommand
Actor 的行为定义如下:
Behaviors.receiveMessage {
case Buy(stock, qty) =>
// 买入逻辑
case Sell(stock, qty) =>
// 卖出逻辑
}
Actor 通过行为变化来维护状态,而不是使用可变变量:
object PortfolioActor {
def apply(): Behavior[PortfolioCommand] = {
portfolio(Portfolio(Map.empty))
}
private def portfolio(stocks: Portfolio): Behavior[PortfolioCommand] = {
Behaviors.receiveMessage {
case Buy(stock, qty) =>
portfolio(stocks.buy(stock, qty))
case Sell(stock, qty) =>
portfolio(stocks.sell(stock, qty))
}
}
}
这种方式完全避免了可变状态,是纯函数式的。
✅ 纯不可变的 Typed Actor,太棒了!
8. Actor 系统:一切的起点
Actor 系统(ActorSystem)是所有 Actor 运行和通信的基础环境。它是一个重量级对象,负责分配线程资源。
通常一个 JVM 进程中只有一个 ActorSystem。
Actor 之间形成层次结构。父 Actor 可以创建子 Actor,子 Actor 的生命周期由父 Actor 管理。
示例:启动整个系统
object BankMain {
final case class Start(clientName: String)
def apply(): Behavior[Start] =
Behaviors.setup { context =>
context.log.info("Creation of the bank")
val bank = context.spawn(Bank(), "bank")
Behaviors.receiveMessage { message =>
context.log.info("Start a new client")
context.spawn(BankClientUsingTheTellPattern(bank), message.clientName)
Behaviors.same
}
}
}
启动系统:
def main(args: Array[String]): Unit = {
val system: ActorSystem[BankMain.Start] = ActorSystem(BankMain(), "main")
system ! Start("Alice")
system ! Start("Bob ")
}
根 Actor(Guardian Actor)负责处理所有发往系统的消息。当它停止时,整个系统也会停止。
Actor 可以通过返回 Behavior.stopped
来停止自己,父 Actor 也可以调用 context.stop
停止子 Actor。
ActorSystem 还负责解析 Actor 引用,并将消息路由到目标 Actor。
9. 总结
本文介绍了 Actor 模型的核心概念,并深入讲解了 Akka Typed 的实现方式。
我们学习了:
- 什么是 Actor
- 如何定义 Actor 行为
- Actor 之间如何通信
- 如何创建和管理 Actor
- 如何通过行为变化维护状态
当然,Akka Typed 还有很多高级特性,如监督机制、生命周期管理等,更多内容请参考 官方文档。
本文代码示例可在 GitHub 获取。