Akka timing task schedule() method

Akka timing task schedule() method

Article directory

  • Akka timing task schedule() method
    • What is the Akka scheduled task schedule() method?
    • How to use Akka timing task schedule() method?
      • How to get Scheduler object outside actor
        • Why do you need to provide an implicit ExecutionContext object for executing scheduled tasks?
      • How to get the Scheduler object inside the actor
      • The format of the schedule() method
    • What types of delays does the Akka timing task schedule() method have?
      • fixed delay
      • fixed frequency
    • Duration class
    • Summarize

What is the Akka timing task schedule() method?

The Akka scheduled task schedule() method is a way to manage periodic execution of tasks in the Akka actor system. It allows us to send messages to actors or execute functions at specified points in time or intervals. It returns a Cancellable object, we can call its cancel() method to cancel the execution of the scheduled task.

The Akka timing task schedule() method is implemented based on a Hashed Wheel Timer, which is an efficient data structure and algorithm for processing a large number of timing trigger events, such as actor receiving timeout and Future timeout , circuit breakers, etc. It does not execute tasks at exactly the specified point in time, but at each tick, executes all tasks that are due. We can modify the accuracy of the Akka timing task schedule() method by configuring the property akka.scheduler.tick-duration.

How to use Akka timing task schedule() method?

How to get the Scheduler object outside the actor

To use the Akka scheduled task schedule() method, we need to obtain a Scheduler object first, which is unique to each ActorSystem and is used for internal scheduling of various events. We can get it by calling the ActorSystem’s scheduler method:

// Get an ActorSystem object
val system = ActorSystem("akka-scheduler-system")
// Get a Scheduler object
val scheduler = system.scheduler

Then, we need to provide an implicit ExecutionContext object for executing timed tasks. We can use the dispatcher that comes with ActorSystem as the ExecutionContext:

// Import the dispatcher that comes with ActorSystem as ExecutionContext
import system.dispatcher

Why do you need to provide an implicit ExecutionContext object for executing scheduled tasks?

ExecutionContext is a trait that represents an execution context that executes program logic asynchronously, usually but not necessarily on a thread pool. It is similar to Java’s Executor interface, which has two abstract methods: execute and reportFailure. The execute method is used to execute a Runnable object, and the reportFailure method is used to report the reason for an asynchronous calculation failure.

When we use the Akka scheduled task schedule() method to create a scheduled task, we need to provide an ExecutionContext object for executing the function or Runnable object we pass in. In this way, we can control the execution strategy of the scheduled task, such as which thread pool to use, how to handle exceptions, etc. We can use the dispatcher that comes with the ActorSystem as the ExecutionContext, or we can customize an ExecutionContext.

For ease of use, we usually pass the ExecutionContext object into the schedule() method as an implicit parameter. In this way, we don’t need to explicitly specify the ExecutionContext object, but only need to import or define an implicit ExecutionContext object in the scope. For example:

// Import the dispatcher that comes with ActorSystem as the implicit ExecutionContext
import system.dispatcher
// Create a timed task without explicitly passing in ExecutionContext
scheduler.schedule(100.millis, 1.second)(() => greeter ! greet)

or

// Define a custom ExecutionContext as the implicit ExecutionContext
implicit val ec: ExecutionContext = ?…
// Create a timed task without explicitly passing in ExecutionContext
scheduler.schedule(100.millis, 1.second)(() => greeter ! greet)

In this way, we can simplify the writing and understanding of code, while retaining the flexibility of timing task execution strategy.

If we don’t want to use implicit parameters to pass in the ExecutionContext object, we can explicitly specify the ExecutionContext object, just add the ExecutionContext object to the last parameter list of the schedule() method. For example:

// Define a custom ExecutionContext object
val ec: ExecutionContext = ...
// Create a timed task and explicitly pass in the ExecutionContext object
scheduler.schedule(100.millis, 1.second)(() => greeter ! greet)(ec)

This way, we can explicitly specify the ExecutionContext object without relying on implicit parameters in scope. This way of writing may be clearer and safer, but it will also increase the redundancy and complexity of the code. Therefore, we should choose whether to use an implicit or explicit way to pass in the ExecutionContext object according to specific scenarios and requirements.

How to get the Scheduler object inside the actor

context is an ActorContext object, which represents the context information of an actor, including its self reference, its subactor code>, its supervision strategy, etc. context also has a system attribute, which is an ActorSystem object, indicating the actor system to which this actor belongs. Therefore, context.system.scheduler.schedule actually calls the scheduler method of the actor system to which this actor belongs to create a scheduled task.

This way of writing usually appears inside the actor, when we want to create a scheduled task inside the actor, we can use context.system.scheduler.schedule code> to get the Scheduler object. For example:

// Create a timed task inside the actor and send a HeartBeat message to itself
context.system.scheduler.schedule(100.millis, 1.second)(() => self ! HeartBeat)

This way, we don’t need to explicitly get or pass in the ActorSystem object, just use context.system.

Format of the schedule() method

Next, we can use the schedule() method of the Scheduler object to create a single-execution or recurring timed task. This method has two overloaded forms:

  • def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ? Unit): Cancellable: This method is used to create a recurring scheduled task, which accepts three parameters: initial delay (initialDelay), Indicates the delay time for the first execution of the task; the interval (interval) indicates the time interval between each execution of the task; the function (f) indicates the task to be executed. It returns a Cancellable object for canceling the scheduled task.
  • def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, message: Any): Cancellable: This method is used to create a timed task that repeatedly sends messages to the actor. It accepts four parameters: initial Delay (initialDelay), indicating the delay time for sending a message for the first time; interval (interval), indicating the time interval between sending messages each time; receiver (receiver), indicating which actor to send a message to; message (message) , indicating what message to send. It returns a Cancellable object for canceling the scheduled task.

If we only want to create a timed task that executes once or sends a message once, we can set the interval to Duration.Zero.

What types of delays does the Akka timing task schedule() method have?

The Akka scheduled task schedule() method has two types of delays: fixed-delay and fixed-rate.

Fixed delay

Fixed delay means that the delay time between two consecutive task executions is always at least equal to the given interval time. The next task execution time will only be calculated after the current task execution is completed. If the current task is a long-running task, the next task execution will be delayed. Therefore, the time difference between two consecutive task executions may not be constant.

Example:

// Create a timed task that repeatedly sends messages to the actor, with an initial delay of 100 milliseconds and an interval of 1 second
scheduler. schedule(100. millis, 1. second, greeter, greet)

or

// Create a timed task that repeatedly executes the function, with an initial delay of 10 milliseconds and an interval of 250 milliseconds
scheduler.schedule(10. millis, 250. millis)(() => greeter ! greet)

Note that the schedule() method can only create fixed-delay scheduled tasks. If we want to create fixed-frequency scheduled tasks, we need to use the new scheduleAtFixedRate() method.

Starting from Akka 2.6, the schedule() method has been deprecated. Akka recommends that we use the scheduleWithFixedDelay() method to create fixed-delay timing tasks. So, we can rewrite the previous example to use the scheduleWithFixedDelay() method:

// Create a timed task that repeatedly sends messages to the actor, with an initial delay of 100 milliseconds and an interval of 1 second
scheduler. scheduleWithFixedDelay(100. millis, 1. second, greeter, greet)

or

// Create a timed task that repeatedly executes the function, with an initial delay of 10 milliseconds and an interval of 250 milliseconds
scheduler.scheduleWithFixedDelay(10. millis, 250. millis)(() => greeter ! greet)

Fixed frequency

Fixed frequency means that the time difference between two consecutive task executions is always equal to the given interval time. The next task execution time is calculated based on the last task execution time and interval time. If the current task is a long-running task, the next task execution may start immediately, or be skipped. Therefore, the time difference between two consecutive task executions may be constant or zero.

Starting from the Akka 2.6 version, Akka recommends that we use the scheduleAtFixedRate() method to create fixed-frequency timing tasks. So, we can rewrite the previous example to use the scheduleAtFixedRate() method:

// Create a timed task that repeatedly sends messages to the actor, with an initial delay of 100 milliseconds and an interval of 1 second
scheduler. scheduleAtFixedRate(100. millis, 1. second, greeter, greet)

or

// Create a timed task that repeatedly executes the function, with an initial delay of 10 milliseconds and an interval of 250 milliseconds
scheduler.scheduleAtFixedRate(10.millis, 250.millis)(() => greeter ! greet)

Duration class

In the previous article using akka to simulate Spark’s Master and Worker communication, why did the parameters in that article include Duration(3,TimeUnit.SECONDS) instead of the one provided in this article.

Duration is a class that represents the length of time. It has multiple construction methods and can accept different types of parameters. The parameter provided in this article uses an implicit conversion to combine a number and a time unit into a Duration object. For example:

// Use implicit conversion to convert 3 seconds into a Duration object
3.seconds
// Equivalent to
Duration(3, TimeUnit. SECONDS)

This implicit conversion is provided by the scala.concurrent.duration package, which allows us to express the duration of time in a more concise and intuitive way. We need to import this package to use this implicit conversion:

// import scala.concurrent.duration package
import scala.concurrent.duration._

So the parameter is either Duration(3,TimeUnit.SECONDS) or 3.seconds.

The parameter is Duration(3,TimeUnit.SECONDS) may be because this package is not imported, or to avoid ambiguity, or to maintain consistency. They have the same effect, just written differently.

Summary

In this article, the Akka scheduled task schedule() method is introduced, which is a way to manage periodic execution tasks in the Akka actor system. You saw how to use the schedule() method to create one-shot and recurring cron jobs, and their different types of delays. Also learned about the implementation principle and accuracy of the Akka timing task schedule() method.