1. 简介
Akka Actor 系统提供了一个调度器(Akka Scheduler),用于管理任务的周期性执行。在本篇文章中,我们将介绍如何使用 Akka Scheduler 来安排任务的执行。
2. 依赖配置
首先,在项目中添加 Akka Actor 的依赖:
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.8"
3. 单次执行调度器
单次执行调度器允许我们延迟执行某个任务,任务会在指定延迟时间后执行一次。
我们先创建一个简单的 Actor:
case class Greet(to: String, by: String)
case class Greeted(msg: String)
class Greetings extends Actor {
override def receive: Receive = {
case greet: Greet =>
sender() ! Greeted(s"${greet.by}: Hello, ${greet.to}")
}
}
接着,初始化一个 ActorSystem:
val schedulerActorSystem = ActorSystem("akka-scheduler-system")
然后创建一个 ActorRef:
val greeter = schedulerActorSystem.actorOf(Props(classOf[Greetings]))
val greeting = Greet("Detective","Lucifer")
schedulerActorSystem.scheduler.scheduleOnce(5.seconds, greeter, greeting)
scheduleOnce
方法接受三个参数:
duration
:延迟时间actor
:要发送消息的目标 Actormessage
:要发送的消息
在上述例子中,调度器将在 5 秒后向 greeter
发送 greeting
消息。注意:需要提供一个隐式的 ExecutionContext
。
✅ 另一种方式是使用 Runnable
接口:
system.scheduler.scheduleOnce(5.seconds, new Runnable {
override def run(): Unit = greeter ! greet
})
4. 周期性执行
Akka Scheduler 也支持周期性任务调度。
例如,我们可以在初始延迟 100 毫秒后,每秒发送一次 greet
消息:
system.scheduler.schedule(100.millis, 1.seconds, greeter, greet)
或者使用 Runnable
:
system.scheduler.schedule(10.millis, 250.millis, new Runnable {
override def run(): Unit = greeter ! greet
})
⚠️ 注意:从 Akka 2.6 开始,schedule
方法已被标记为废弃。
5. 延迟类型
Akka Scheduler 提供了两种类型的延迟执行方式:
5.1. 固定延迟执行(Fixed-Delay)
固定延迟意味着每次任务执行完成后,才会计算下一次的执行时间。如果当前任务执行时间较长,那么后续任务会被推迟。
✅ 推荐使用 scheduleWithFixedDelay
替代已废弃的 schedule
方法:
system.scheduler.scheduleWithFixedDelay(10.millis, 250.millis, greeter, greet)
5.2. 固定频率执行(Fixed-Rate)
固定频率会根据任务执行时间调整下一次调度的时间。例如,如果任务耗时 200ms,而设定周期为 500ms,则下次任务将在 300ms 后执行。
使用 scheduleAtFixedRate
实现:
system.scheduler.scheduleAtFixedRate(10.millis, 500.millis)(new Runnable {
override def run(): Unit = greeter ! greet
})
6. 取消调度任务
每次创建调度任务时,都会返回一个 Cancellable
实例。我们可以通过它来取消任务:
val schedulerInstance: Cancellable = system.scheduler.schedule(
100.millis, 1.seconds, greeter, greet)
schedulerInstance.cancel()
7. Actor 定时器(Timers)
如果调度逻辑在 Actor 内部,Akka 推荐使用 Actor Timers 而不是全局的 Scheduler。Actor Timers 会在 Actor 重启时自动管理调度状态。
实现方式是混入 akka.actor.Timers
trait:
class TimerActor(replyTo: ActorRef) extends Actor with Timers {
override def preStart(): Unit = {
timers.startTimerWithFixedDelay(PeriodicTimerKey, PeriodicTick, 200.millis)
super.preStart()
}
}
8. 调度精度
Akka Scheduler 是为高吞吐量设计的,不是高精度调度器。不能保证任务会在精确的时间点执行,可能会有几毫秒的延迟。
❌ 如果你需要精确时间点执行任务,建议使用 Quartz Scheduler 或 Akka Quartz Scheduler。
9. 总结
本文介绍了 Akka Scheduler 的基本使用方式,包括单次调度、周期性调度、任务取消和 Actor 内部定时器等内容。
示例代码和测试用例可以在 GitHub 上找到。