1. 概述
响应式宣言 是所有响应式系统的指导原则,其中指出分布式应用必须具备弹性(resilience)。为了实现弹性,我们必须构建将失败视为架构中第一公民的系统。
Akka 库通过监督(supervision)机制,提供了实现弹性系统的基本构件。本文将介绍如何使用 Akka 和监督机制,提升分布式系统的容错能力。
2. Akka 依赖
要使用 Akka Typed 库,通常需要引入以下依赖:
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0",
libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.0" % Test
3. 场景设定
为了说明 Akka 的监督机制,我们先设定一个简单的场景:模拟一个提供文件服务的 Web 服务器。
trait Resource
case class File(id: String, content: Array[Byte], mimeType: String) extends Resource
该 Web 服务器只处理 Get
请求,返回内容始终包含原始路径:
case class Get(path: String, replyTo: ActorRef[Response]) extends Request
case class Ok(path: String, resource: Resource) extends Response
case class NotFound(path: String) extends Response
case class BadRequest(path: String) extends Response
case class InternalServerError(path: String, error: String) extends Response
服务器收到请求后,会委托 Filesystem
来查找资源,并将结果返回给调用方:
object WebServer {
def apply(filesystem: ActorRef[Find]): Behavior[Request] =
Behaviors.receive { (context, message) =>
message match {
case Get(path, replyTo) =>
findInTheFilesystem(filesystem, context, replyTo, path)
// More to come ;)
}
Behaviors.same
}
目前我们省略了一些细节,后续会逐步补充完整。
4. 处理 or 不处理:这是个问题
在分布式系统中,我们必须将失败纳入设计考量。那么,代码中到底应该处理哪些错误?
基本上,错误可以分为两类:
✅ 验证错误(Validation Errors):属于可预期的业务逻辑错误,例如输入不合法等。
❌ 失败(Failures):不可预期的系统级错误,例如外部服务不可用、磁盘空间不足等。
验证错误应该作为 Actor 协议的一部分进行处理。例如,如果路径不是合法 URI,服务器应返回 BadRequest
:
Behaviors.receive { (context, message) =>
message match {
case Get(path, replyTo) =>
if (isNotValid(path)) {
replyTo ! BadRequest(path)
}
而失败则不应该混入业务逻辑中。正确的做法是将失败的处理责任交给外部实体,即“监督者(supervisor)”。
5. 监督机制:应对不可预期的失败
我们来完善一下示例。Web 服务器通过 Filesystem
查找资源,我们可以将 Filesystem
建模为一个 Actor:
object Filesystem {
def apply(): Behavior[FsFind] = search
private def search: Behavior[FsFind] = {
Behaviors.receive { (context, message) =>
context.log.info(s"Received a request for path ${message.path}")
message.replyTo !
if (Random.nextBoolean)
FsFound(File("id", "{'result': 'ok'}".getBytes(), "application/json"))
else
FsMiss
Behaviors.same
}
}
}
Akka 的监督机制通过行为装饰器(behavior decoration)实现:
object Filesystem {
def apply(): Behavior[Find] = {
Behaviors
.supervise[Find](search)
.onFailure[Exception](SupervisorStrategy.restart)
}
// ...
上面的代码表示:当 Exception
或其子类抛出时,采用 restart
策略重启 Actor。
5.1. 继续干活:Resume 策略
✅ 当错误未破坏 Actor 状态时,可以选择 resume,即 Actor 继续处理后续消息。
例如,实现一个缓存 Actor:
def cache(filesystem: ActorRef[FsFind],
cacheMap: Map[String, Resource]): Behavior[Request] =
Behaviors.receive { (ctx, message) =>
message match {
case Find(path, replyTo) =>
val maybeAnHit = cacheMap.get(path)
maybeAnHit match {
case Some(r) => replyTo ! Hit(r)
case None => askFilesystemForResource(filesystem, ctx, path, replyTo)
}
Behaviors.same
case AdaptedFsFound(path, res, replyTo) =>
replyTo ! Hit(res)
cache(filesystem, cacheMap + (path -> res))
case AdaptedFsMiss(_, replyTo) =>
replyTo ! Miss
Behaviors.same
}
}
使用 SupervisorStrategy.resume
策略:
object Cache {
def apply(filesystem: ActorRef[FsFind]):
Behaviors
.supervise(cache(filesystem, Map[String, Resource]()))
.onFailure(SupervisorStrategy.resume)
// ...
}
5.2. 重启 Actor:重新开始
❌ 如果状态已损坏,则重启 Actor 是更合理的选择。
Behaviors.supervise[Find](search).onFailure[Exception](SupervisorStrategy.restart)
还可以指定重启次数限制:
SupervisorStrategy.restart.withLimit(maxNrOfRetries = 10, withinTimeRange = 5.minutes)
默认情况下,重启会停止所有子 Actor,避免资源泄露。但可以使用 withStopChildren(false)
保留子 Actor:
def apply(): Behavior[Request] = {
Behaviors.setup { context =>
val filesystem = context.spawn(Filesystem(), "filesystem")
val cache = context.spawn(Cache(filesystem), "cache")
Behaviors.supervise {
serve(context, cache)
}.onFailure[Exception](SupervisorStrategy.restart.withStopChildren(false))
}
}
在重启时,Actor 可以监听 PreRestart
信号来清理资源:
Behaviors.receive[FsFind] { (context, message) =>
// ...
}.receiveSignal {
case (ctx, signal) if signal == PreRestart =>
ctx.log.info("Releasing any dangling resource")
Behaviors.same
}
5.3. 停止 Actor:彻底放弃
🛑 停止(Stop)是默认的监督策略。当状态严重受损,重启也无法恢复时,直接停止 Actor。
Actor 被停止时会收到 PostStop
信号:
Behaviors.receive[Find] { (context, message) =>
// ...
}.receiveSignal {
case (ctx, signal) if signal == PreRestart || signal == PostStop =>
ctx.log.info("Releasing any dangling resource")
Behaviors.same
}
Akka 中 Actor 是有层级结构的。如果子 Actor 停止,父 Actor 默认也会失败。如果父 Actor 希望继续运行,需要 watch
子 Actor:
val webServer = context.spawn(WebServer(), message.id)
context.watch(webServer)
监听 ChildFailed
信号:
object Main {
case class Start(id: String)
def apply: Behavior[Start] = {
Behaviors.receive[Start] { (context, message) =>
val webServer = context.spawn(WebServer(), message.id)
context.watch(webServer)
Behaviors.same
}.receiveSignal {
case (ctx, ChildFailed(ref, cause)) =>
ctx.log.error(s"Child actor ${ref.path} failed with error ${cause.getMessage}")
Behaviors.same
}
}
}
⚠️ 如果不处理 ChildFailed
,父 Actor 会抛出 DeathPactException
,并将失败向上冒泡。
6. 消息会怎样?
💥 引发异常的消息会被丢弃,不会重新入队。
✅ 但 Actor 的邮箱不会丢失,重启或恢复后会继续处理剩余消息。
7. 总结
本文区分了验证错误与失败,介绍了 Akka 中的监督机制和三种策略:
- Resume:继续处理
- Restart:重启 Actor
- Stop:停止 Actor
监督机制是 Akka 实现弹性系统的关键,通过合理使用可以显著提升系统的容错能力。
📌 代码示例:GitHub 链接