1. 概述

Actor 模型是一种非常有前景的并发编程模型,而 Akka 是其在 JVM 平台上的代表性实现。

本文将介绍 Actor 模型的核心特性,并深入讲解 Akka 的最新版本——Akka Typed 是如何实现这些特性的。

2. 场景说明

为了更好地理解 Actor 模型,我们通过一个具体的例子来贯穿全文:构建一个股票组合系统,支持买入和卖出操作。

Stock 类表示一个股票及其持有数量:

case class Stock(name: String, owned: Long) {
  def buy(qty: Long): Stock = copy(name, owned + qty)
  def sell(qty: Long): Stock =
    if (qty <= owned)
      copy(name, owned - qty)
    else
      this
}

Portfolio 类封装了一个 Stock 的 Map:

case class Portfolio(stocks: Map[String, Stock]) {
  def buy(name: String, qty: Long): Portfolio = {
    // 添加股票逻辑
  }
  def sell(name: String, qty: Long): Portfolio = {
    // 卖出股票逻辑
  }
}

这个系统的客户端可以是一些社交网络分析机器人,它们根据舆情决定是否买卖股票。

3. 项目依赖配置

我们使用 SBT 构建项目。要使用 Akka Typed,需要添加以下依赖:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0"

若要测试 Akka Typed 的 Actor,还需添加测试工具包:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.0" % Test

4. 主角登场:Actor

Actor 模型的核心是 Actor,它是一个独立的计算单元,通过内置的消息机制与其他 Actor 交互。

Actor 是一种响应式对象,它通过消息驱动执行计算逻辑

每个 Actor 都是独立的,它们之间不共享状态,唯一的通信方式就是发送消息(有时等待响应)。

在一个 Actor 系统中,Actor 只能做三件事:

  • 向自己或其他 Actor 发送消息
  • 创建新的 Actor
  • 指定新的行为(behavior)

Akka Typed 将 Actor 定义为行为的工厂。

定义 Actor 有两种方式:面向对象或函数式。本文采用后者。

示例:银行管理股票组合

object Bank {
  final case class CreatePortfolio(client: ActorRef[PortfolioCreated])
  final case class PortfolioCreated(portfolio: ActorRef[PortfolioCommand])

  def apply(): Behavior[CreatePortfolio] =
    Behaviors.receive { (context, message) =>
      val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
      message.client ! PortfolioCreated(replyTo)
      Behaviors.same
    }
}

这段代码定义了我们的第一个 Typed Actor —— Bank

apply 方法定义了 Actor 的行为,也就是它能处理的消息类型。这里它只处理 CreatePortfolio 消息。

Behaviors 是创建行为的工厂类。

⚠️ 每个 Actor 都是并发执行的,但其内部状态是私有的,不会引发竞态条件

每个 Actor 都有一个关联的 邮箱(mailbox),用于缓存待处理的消息。默认邮箱是无界的,更多信息请参考 Akka Mailboxes 文档

5. Actor 如何通信:消息机制

消息是 Actor 之间通信的唯一方式。每个 Actor 都定义了自己的通信协议,也就是它能处理的消息集合。

为了保证线程安全,建议将消息定义为不可变类。如果一个 Actor 能处理多种消息,建议使用 sealed trait 作为基类。

Bank Actor 的协议示例:

final case class CreatePortfolio(client: ActorRef[PortfolioCreated])
final case class PortfolioCreated(portfolio: ActorRef[PortfolioCommand])

Actor 能处理的消息类型由其 Behavior[T] 类型参数决定。

Actor 引用:ActorRef

Actor 之间通信需要持有对方的引用,Akka 使用 ActorRef[-T] 来表示。其中 T 表示该 Actor 能处理的消息类型。

如果要实现请求-响应模式,需要在消息中包含回复地址。

例如,客户端请求创建 Portfolio:

Behaviors.receive { (context, message) =>
  bank ! CreatePortfolio(context.self) 
  // 更多逻辑
}

通信模式

5.1. Tell 模式(发送消息)

Tell 模式是最基本的通信方式,完全异步。

val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
message.client ! PortfolioCreated(replyTo)

发送后不关心是否成功,适用于 fire-and-forget 场景。

5.2. Ask 模式(请求响应)

Ask 模式更适合需要等待响应的场景,适用于 1:1 请求响应。

object BankClient {
  def apply(bank: ActorRef[CreatePortfolio]): Behavior[Unit] =
    Behaviors.setup { context =>
      implicit val timeout: Timeout = 3.seconds
      context.ask(bank, CreatePortfolio) {
        case Success(message) =>
          context.log.info("Portfolio received")
          message.portfolio ! Buy("APPL", 100L)
        case Failure(_) => context.log.info("Portfolio received")
      }
      Behaviors.ignore[Unit]
    }
}

使用 Ask 模式需要注意:

  • 需要设置超时时间,避免阻塞
  • 使用 context.ask 发送消息
  • 提供成功/失败的回调处理逻辑

6. 扩展通信:Actor 创建

Akka Typed 使用工厂方法创建 Actor 实例,这些方法定义在 Behaviors 对象中。

最常用的是 Behaviors.receive

Behaviors.receive { (context, message) =>
  val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
  message.client ! PortfolioCreated(replyTo)
  Behaviors.same
}

该方法接收上下文和消息,并返回新的行为。如果不需要改变行为,可以使用 Behaviors.same

如果不需要上下文,可以使用 Behaviors.receiveMessage

Behaviors.receiveMessage { message =>
  message match {
    case Buy(stock, qty) =>
      portfolio(stocks.buy(stock, qty))
    case Sell(stock, qty) =>
      portfolio(stocks.sell(stock, qty))
  }
}

如果需要在 Actor 启动时执行逻辑,可以使用 Behaviors.setup

object BankClientUsingTheTellPattern {
  def apply(bank: ActorRef[CreatePortfolio]): Behavior[PortfolioCreated] =
    Behaviors.setup { context =>
      bank ! CreatePortfolio(context.self)

      Behaviors.receiveMessage {
        case PortfolioCreated(portfolio) =>
          // 处理新创建的 Portfolio
      }
    }
}

7. Actor 行为与状态管理

Akka Typed 之所以叫“Typed”,是因为它通过 Behavior[-T] 类型强制编译器检查消息类型的合法性。

例如,PortfolioActor 只能处理 BuySell 消息:

sealed trait PortfolioCommand 
final case class Buy(stock: String, quantity: Long) extends PortfolioCommand 
final case class Sell(stock: String, quantity: Long) extends PortfolioCommand

Actor 的行为定义如下:

Behaviors.receiveMessage { 
  case Buy(stock, qty) => 
    // 买入逻辑
  case Sell(stock, qty) => 
    // 卖出逻辑
}

Actor 通过行为变化来维护状态,而不是使用可变变量:

object PortfolioActor {
  
  def apply(): Behavior[PortfolioCommand] = {
    portfolio(Portfolio(Map.empty))
  }

  private def portfolio(stocks: Portfolio): Behavior[PortfolioCommand] = {
    Behaviors.receiveMessage {
      case Buy(stock, qty) =>
        portfolio(stocks.buy(stock, qty))
      case Sell(stock, qty) =>
        portfolio(stocks.sell(stock, qty))
    }
  }
}

这种方式完全避免了可变状态,是纯函数式的。

纯不可变的 Typed Actor,太棒了!

8. Actor 系统:一切的起点

Actor 系统(ActorSystem)是所有 Actor 运行和通信的基础环境。它是一个重量级对象,负责分配线程资源。

通常一个 JVM 进程中只有一个 ActorSystem。

Actor 之间形成层次结构。父 Actor 可以创建子 Actor,子 Actor 的生命周期由父 Actor 管理。

示例:启动整个系统

object BankMain {
  final case class Start(clientName: String)

  def apply(): Behavior[Start] =
    Behaviors.setup { context =>
      context.log.info("Creation of the bank")
      val bank = context.spawn(Bank(), "bank")
      Behaviors.receiveMessage { message =>
        context.log.info("Start a new client")
        context.spawn(BankClientUsingTheTellPattern(bank), message.clientName)
        Behaviors.same
      }
    }
}

启动系统:

def main(args: Array[String]): Unit = {
  val system: ActorSystem[BankMain.Start] = ActorSystem(BankMain(), "main")
  system ! Start("Alice")
  system ! Start("Bob ")
}

根 Actor(Guardian Actor)负责处理所有发往系统的消息。当它停止时,整个系统也会停止。

Actor 可以通过返回 Behavior.stopped 来停止自己,父 Actor 也可以调用 context.stop 停止子 Actor。

ActorSystem 还负责解析 Actor 引用,并将消息路由到目标 Actor。

9. 总结

本文介绍了 Actor 模型的核心概念,并深入讲解了 Akka Typed 的实现方式。

我们学习了:

  • 什么是 Actor
  • 如何定义 Actor 行为
  • Actor 之间如何通信
  • 如何创建和管理 Actor
  • 如何通过行为变化维护状态

当然,Akka Typed 还有很多高级特性,如监督机制、生命周期管理等,更多内容请参考 官方文档

本文代码示例可在 GitHub 获取。


原始标题:Typed Akka: The Actor Model Done Right

» 下一篇: Scala 运算符入门