1. 概述

在本教程中,我们将介绍如何在使用 Play Framework 编写的路由处理器中调度一个异步任务。

首先,我们会说明哪些场景适合使用异步任务;接着,介绍两种在 Play Framework 中调度任务的方法。

2. 为什么需要异步任务?

在 Play Framework 应用中,使用异步任务至少有以下三个原因:

✅ 启动一个涉及多个应用的长时间运行任务
✅ 触发应用内部状态的更新
✅ 记录用户行为而不增加响应时间

有时候,我们希望在接收到 REST API 请求后,启动一个耗时任务。例如,某个任务需要从数据库获取数据、生成 PDF 报告并发送给用户。对于调用我们 REST API 的客户端来说,等待邮件发送完成显然不合理。更好的做法是立即返回 HTTP 202 Accepted 状态码,然后在后台处理任务。

我们可能还需要清除缓存、重新加载数据并预热缓存以提升响应速度。这种情况下,也不应该让客户端等待缓存刷新完成,因为 HTTP 连接很可能会超时。

此外,我们可能需要将用户行为事件发送到消息队列。如果这些事件主要用于监控,并不是业务流程的核心部分,可以选择在后台任务中发送。

3. 将函数作为异步任务调度

Play Framework 3.0 使用 Apache Pekko 作为底层的 Actor 系统。而 Play 2.x 则使用 Akka。

在控制器代码中,我们可以访问 Actor 系统及其调度器。当我们拿到 Scheduler 后,就可以将任意函数作为后台任务运行。

首先,我们将 ActorSystem 添加为控制器的依赖项,并将 ExecutionContext 作为隐式依赖:

class AsyncTaskController @Inject()(val controllerComponents: ControllerComponents, val actorSystem: ActorSystem)(implicit ec: ExecutionContext) extends BaseController {
  def runAsync(): Action[AnyContent] = Action {
    ...
  }
}

为了访问该接口,我们需要在 routes 文件中添加配置:

GET /async controllers.AsyncTaskController.runAsync()

现在,我们可以 获取 ActorSystem 的调度器,并用它调度一个匿名函数在 30 秒后运行

import scala.concurrent.duration._
import scala.language.postfixOps

def runAsync(): Action[AnyContent] = Action {
  Console.println(s"In route handler: ${DateTime.now()}")
  actorSystem.scheduler.scheduleOnce(30 seconds) {
    Console.println(s"30 seconds later: ${DateTime.now()}")
  }
  ...
}

注意,30 seconds 是一个有效的 FiniteDuration 对象,因为我们导入了 scala.concurrent.duration._ 的隐式转换,并通过 import scala.language.postfixOps 启用了后缀表达式。

为了验证该匿名函数确实是在后台延迟运行的,我们可以启动 Play 应用并访问 http://localhost:9000/async。在日志中,你将看到如下输出:

In route handler: 2021-02-10T18:09:22.639+01:00
30 seconds later: 2021-02-10T18:09:52.794+01:00

4. 使用 Actor

使用 ActorSystem 有两个主要优势:

✅ Actor 可以是有状态的,同时我们无需担心并发访问问题,因为 Actor 一次只处理一个消息
✅ Actor 可以返回响应

要在 Play Framework 中使用 Actor,我们需要定义一个 Guice 模块,将 Actor 添加到 Actor 系统中。此外,我们还需要向控制器注入一个 Actor 引用。我们先从 注入 Actor 引用并向其发送消息 开始:

class AsyncTaskController @Inject()(..., @Named("async-job-actor") actor: ActorRef)(implicit ec: ExecutionContext) extends BaseController {
  def runAsync(): Action[AnyContent] = Action {
    actor ! "THIS IS THE MESSAGE SENT TO THE ACTOR"
    ...
  }
}

@Named("async-job-actor") 注解告诉 Play Framework 应该将哪个 Actor 注入到控制器中。在 runAsync 函数中,我们调用 ! 方法将对象发送给 Actor。我们可以传递任何可序列化的对象作为消息。

为了让这段代码运行起来,我们还需要做几件事。首先,定义一个接收消息的 Actor

class AsyncTaskInActor extends Actor {
  override def receive: Receive = {
    case msg: String =>
      Console.println(s"Message ${msg} received at ${DateTime.now()}")
  }
}

接着,在 Guice 依赖注入模块中添加绑定

class ActorsModule extends AbstractModule with PekkoGuiceSupport {
  override def configure(): Unit = {
    bindActor(classOf[AsyncTaskInActor], "async-job-actor")
  }
}

最后,打开 application.conf 文件,将新模块添加到 Play Framework 使用的模块列表中

play.modules.enabled += "actors.ActorsModule"

4.1. 延迟发送消息给 Actor

如果我们想延迟发送消息给 Actor,可以把前面两种方法结合起来,使用 scheduleOnce 函数,它可以接受一个 Actor 引用作为参数:

actorSystem.scheduler.scheduleOnce(
  30 seconds,
  actor,
  "A TEST MESSAGE!!"
)

4.2. 调度周期性任务

有时我们需要执行周期性任务,这时可以使用 scheduleAtFixedRate 函数,它接受初始延迟、后续运行间隔、Actor 引用和消息作为参数:

val cancellable = actorSystem.scheduler.scheduleAtFixedRate(
  10 seconds,
  5 minutes,
  actor,
  "recurring task message"
)

该函数返回一个 Cancellable 实例,我们可以调用 cancel() 方法来停止周期性任务:

cancellable.cancel()

5. 总结

本教程演示了如何在 Play Framework 中通过直接使用 Scheduler 来异步运行匿名函数,以及如何访问 ActorSystem 来执行异步任务。我们还展示了如何定义 Guice 模块,将自定义 Actor 添加到 ActorSystem 中,并注入 Actor 引用。

完整示例代码可以在 GitHub 上找到。


原始标题:Scheduling Asynchronous Tasks in Play Framework