1. 简介

Akka Actor 系统提供了一个调度器(Akka Scheduler),用于管理任务的周期性执行。在本篇文章中,我们将介绍如何使用 Akka Scheduler 来安排任务的执行。

2. 依赖配置

首先,在项目中添加 Akka Actor 的依赖:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.8"

3. 单次执行调度器

单次执行调度器允许我们延迟执行某个任务,任务会在指定延迟时间后执行一次。

我们先创建一个简单的 Actor:

case class Greet(to: String, by: String) 
case class Greeted(msg: String)
class Greetings extends Actor {
  override def receive: Receive = {
    case greet: Greet =>
      sender() ! Greeted(s"${greet.by}: Hello, ${greet.to}")
  }
}

接着,初始化一个 ActorSystem:

val schedulerActorSystem = ActorSystem("akka-scheduler-system")

然后创建一个 ActorRef:

val greeter = schedulerActorSystem.actorOf(Props(classOf[Greetings]))
val greeting = Greet("Detective","Lucifer")
schedulerActorSystem.scheduler.scheduleOnce(5.seconds, greeter, greeting)

scheduleOnce 方法接受三个参数:

  • duration:延迟时间
  • actor:要发送消息的目标 Actor
  • message:要发送的消息

在上述例子中,调度器将在 5 秒后向 greeter 发送 greeting 消息。注意:需要提供一个隐式的 ExecutionContext

✅ 另一种方式是使用 Runnable 接口:

system.scheduler.scheduleOnce(5.seconds, new Runnable {
  override def run(): Unit = greeter ! greet
})

4. 周期性执行

Akka Scheduler 也支持周期性任务调度。

例如,我们可以在初始延迟 100 毫秒后,每秒发送一次 greet 消息:

system.scheduler.schedule(100.millis, 1.seconds, greeter, greet)

或者使用 Runnable

system.scheduler.schedule(10.millis, 250.millis, new Runnable {
  override def run(): Unit = greeter ! greet
})

⚠️ 注意:从 Akka 2.6 开始,schedule 方法已被标记为废弃。

5. 延迟类型

Akka Scheduler 提供了两种类型的延迟执行方式:

5.1. 固定延迟执行(Fixed-Delay)

固定延迟意味着每次任务执行完成后,才会计算下一次的执行时间。如果当前任务执行时间较长,那么后续任务会被推迟。

✅ 推荐使用 scheduleWithFixedDelay 替代已废弃的 schedule 方法:

system.scheduler.scheduleWithFixedDelay(10.millis, 250.millis, greeter, greet)

5.2. 固定频率执行(Fixed-Rate)

固定频率会根据任务执行时间调整下一次调度的时间。例如,如果任务耗时 200ms,而设定周期为 500ms,则下次任务将在 300ms 后执行。

使用 scheduleAtFixedRate 实现:

system.scheduler.scheduleAtFixedRate(10.millis, 500.millis)(new Runnable {
  override def run(): Unit = greeter ! greet
})

6. 取消调度任务

每次创建调度任务时,都会返回一个 Cancellable 实例。我们可以通过它来取消任务:

val schedulerInstance: Cancellable = system.scheduler.schedule(
  100.millis, 1.seconds, greeter, greet)
schedulerInstance.cancel()

7. Actor 定时器(Timers)

如果调度逻辑在 Actor 内部,Akka 推荐使用 Actor Timers 而不是全局的 Scheduler。Actor Timers 会在 Actor 重启时自动管理调度状态。

实现方式是混入 akka.actor.Timers trait:

class TimerActor(replyTo: ActorRef) extends Actor with Timers {
  override def preStart(): Unit = {
    timers.startTimerWithFixedDelay(PeriodicTimerKey, PeriodicTick, 200.millis)
    super.preStart()
  }
}

8. 调度精度

Akka Scheduler 是为高吞吐量设计的,不是高精度调度器。不能保证任务会在精确的时间点执行,可能会有几毫秒的延迟。

❌ 如果你需要精确时间点执行任务,建议使用 Quartz SchedulerAkka Quartz Scheduler

9. 总结

本文介绍了 Akka Scheduler 的基本使用方式,包括单次调度、周期性调度、任务取消和 Actor 内部定时器等内容。

示例代码和测试用例可以在 GitHub 上找到。


原始标题:Introduction to Akka Scheduler

« 上一篇: 同步处理Futures