1. 概述

响应式宣言 是所有响应式系统的指导原则,其中指出分布式应用必须具备弹性(resilience)。为了实现弹性,我们必须构建将失败视为架构中第一公民的系统。

Akka 库通过监督(supervision)机制,提供了实现弹性系统的基本构件。本文将介绍如何使用 Akka 和监督机制,提升分布式系统的容错能力

2. Akka 依赖

要使用 Akka Typed 库,通常需要引入以下依赖:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0",
libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.0" % Test

3. 场景设定

为了说明 Akka 的监督机制,我们先设定一个简单的场景:模拟一个提供文件服务的 Web 服务器。

trait Resource
case class File(id: String, content: Array[Byte], mimeType: String) extends Resource

该 Web 服务器只处理 Get 请求,返回内容始终包含原始路径:

case class Get(path: String, replyTo: ActorRef[Response]) extends Request

case class Ok(path: String, resource: Resource) extends Response
case class NotFound(path: String) extends Response
case class BadRequest(path: String) extends Response
case class InternalServerError(path: String, error: String) extends Response

服务器收到请求后,会委托 Filesystem 来查找资源,并将结果返回给调用方:

object WebServer {
  def apply(filesystem: ActorRef[Find]): Behavior[Request] =
    Behaviors.receive { (context, message) =>
      message match {
        case Get(path, replyTo) =>
          findInTheFilesystem(filesystem, context, replyTo, path)
        // More to come ;)
      }
      Behaviors.same
    }

目前我们省略了一些细节,后续会逐步补充完整。

4. 处理 or 不处理:这是个问题

在分布式系统中,我们必须将失败纳入设计考量。那么,代码中到底应该处理哪些错误?

基本上,错误可以分为两类:

验证错误(Validation Errors):属于可预期的业务逻辑错误,例如输入不合法等。
失败(Failures):不可预期的系统级错误,例如外部服务不可用、磁盘空间不足等。

验证错误应该作为 Actor 协议的一部分进行处理。例如,如果路径不是合法 URI,服务器应返回 BadRequest

Behaviors.receive { (context, message) =>
  message match {
    case Get(path, replyTo) =>
      if (isNotValid(path)) {
        replyTo ! BadRequest(path)
      }

而失败则不应该混入业务逻辑中。正确的做法是将失败的处理责任交给外部实体,即“监督者(supervisor)”

5. 监督机制:应对不可预期的失败

我们来完善一下示例。Web 服务器通过 Filesystem 查找资源,我们可以将 Filesystem 建模为一个 Actor:

object Filesystem {
  def apply(): Behavior[FsFind] = search

  private def search: Behavior[FsFind] = {
    Behaviors.receive { (context, message) =>
      context.log.info(s"Received a request for path ${message.path}")
      message.replyTo !
      if (Random.nextBoolean)
        FsFound(File("id", "{'result': 'ok'}".getBytes(), "application/json"))
      else
        FsMiss
      Behaviors.same
    }
  }
}

Akka 的监督机制通过行为装饰器(behavior decoration)实现:

object Filesystem {
  def apply(): Behavior[Find] = {
    Behaviors
      .supervise[Find](search)
      .onFailure[Exception](SupervisorStrategy.restart)
  }
 
  // ...

上面的代码表示:当 Exception 或其子类抛出时,采用 restart 策略重启 Actor。

5.1. 继续干活:Resume 策略

当错误未破坏 Actor 状态时,可以选择 resume,即 Actor 继续处理后续消息。

例如,实现一个缓存 Actor:

def cache(filesystem: ActorRef[FsFind],
          cacheMap: Map[String, Resource]): Behavior[Request] =
  Behaviors.receive { (ctx, message) =>
    message match {
      case Find(path, replyTo) =>
        val maybeAnHit = cacheMap.get(path)
        maybeAnHit match {
          case Some(r) => replyTo ! Hit(r)
          case None => askFilesystemForResource(filesystem, ctx, path, replyTo)
        }
        Behaviors.same
      case AdaptedFsFound(path, res, replyTo) =>
        replyTo ! Hit(res)
        cache(filesystem, cacheMap + (path -> res))
      case AdaptedFsMiss(_, replyTo) =>
        replyTo ! Miss
        Behaviors.same
    }
  }

使用 SupervisorStrategy.resume 策略:

object Cache {
  def apply(filesystem: ActorRef[FsFind]):
    Behaviors
      .supervise(cache(filesystem, Map[String, Resource]()))
      .onFailure(SupervisorStrategy.resume)
 
  // ...
}

5.2. 重启 Actor:重新开始

如果状态已损坏,则重启 Actor 是更合理的选择

Behaviors.supervise[Find](search).onFailure[Exception](SupervisorStrategy.restart)

还可以指定重启次数限制:

SupervisorStrategy.restart.withLimit(maxNrOfRetries = 10, withinTimeRange = 5.minutes)

默认情况下,重启会停止所有子 Actor,避免资源泄露。但可以使用 withStopChildren(false) 保留子 Actor:

def apply(): Behavior[Request] = {
  Behaviors.setup { context =>
    val filesystem = context.spawn(Filesystem(), "filesystem")
    val cache = context.spawn(Cache(filesystem), "cache")
    Behaviors.supervise {
      serve(context, cache)
    }.onFailure[Exception](SupervisorStrategy.restart.withStopChildren(false))
  }
}

在重启时,Actor 可以监听 PreRestart 信号来清理资源:

Behaviors.receive[FsFind] { (context, message) =>
  // ...
}.receiveSignal {
  case (ctx, signal) if signal == PreRestart =>
    ctx.log.info("Releasing any dangling resource")
    Behaviors.same
}

5.3. 停止 Actor:彻底放弃

🛑 停止(Stop)是默认的监督策略。当状态严重受损,重启也无法恢复时,直接停止 Actor。

Actor 被停止时会收到 PostStop 信号:

Behaviors.receive[Find] { (context, message) =>
  // ...
}.receiveSignal {
  case (ctx, signal) if signal == PreRestart || signal == PostStop =>
    ctx.log.info("Releasing any dangling resource")
    Behaviors.same
}

Akka 中 Actor 是有层级结构的。如果子 Actor 停止,父 Actor 默认也会失败。如果父 Actor 希望继续运行,需要 watch 子 Actor:

val webServer = context.spawn(WebServer(), message.id)
context.watch(webServer)

监听 ChildFailed 信号:

object Main {
  case class Start(id: String)
  def apply: Behavior[Start] = {
    Behaviors.receive[Start] { (context, message) =>
      val webServer = context.spawn(WebServer(), message.id)
      context.watch(webServer)
      Behaviors.same
    }.receiveSignal {
      case (ctx, ChildFailed(ref, cause)) =>
        ctx.log.error(s"Child actor ${ref.path} failed with error ${cause.getMessage}")
        Behaviors.same
    }
  }
}

⚠️ 如果不处理 ChildFailed,父 Actor 会抛出 DeathPactException,并将失败向上冒泡。

6. 消息会怎样?

💥 引发异常的消息会被丢弃,不会重新入队。

✅ 但 Actor 的邮箱不会丢失,重启或恢复后会继续处理剩余消息。

7. 总结

本文区分了验证错误与失败,介绍了 Akka 中的监督机制和三种策略:

  • Resume:继续处理
  • Restart:重启 Actor
  • Stop:停止 Actor

监督机制是 Akka 实现弹性系统的关键,通过合理使用可以显著提升系统的容错能力。

📌 代码示例:GitHub 链接


原始标题:Regular Expressions in Kotlin