Flink consumes kafka and reports akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@xxx]]after x

Article directory

  • background:
  • Phenomena and Analysis
  • solution
  • Summarize:
  • expand
  • refer to

Background:

Recently, the company where the author works is researching and using flink, because the company only has the author as a big data developer. The author is fortunate to lead this research, but we are also newbies, and we will share some pitfalls encountered in the process with you later. Of course, we are still struggling at the stage of DataStream Api, and strive to launch flink sql as soon as possible. This time the error was encountered when consuming Kafka during the development process. I hereby record a memo, and I hope it will be helpful to everyone. Let’s look at the errors below.

Phenomenon and analysis

The phenomenon we see in the data task here is that the task has not generated new data. After checking the TaskManager and JobManager logs, it is found that the following error is reported in the taskmanager log:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://[email protected]:6123/user/rpc/jobmanager_2#360860634]] after [10000 ms ].
Message of type [org.apache.flink.runtime.rpc.messages.RemoteFencedMessage].
A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
2023-05-22 15:56:36,474 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor[]
- Cannot find task to fail for execution d0748c664c42f1de87b6785c9b37cf49_687d766627c4a5a1e88148795d8315c0_0_1 with exception:
java.util.concurrent.TimeoutException: Invocation of
[RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://[email protected]:6123/user/rpc/jobmanager_2] timed out.
This is usually caused by:
 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs.
 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.

Seeing the error report, we must know to some extent that Flink actually uses the communication framework of akka for asynchronous communication. So we first chose to restart the task directly, and found that the task reported an error directly after the task reached a certain time node. We know that Akka was adopted as early as version 0.9 of Flink as the implementation of distributed communication. With Akka in Flink, all remote procedure calls (RPC) are implemented as asynchronous messages. This mainly affects the three components of JobManager, TaskManager and JobClient. So the analysis here is that the data communication between JobManager and TaskManager of the task timed out. Reading the error message tells us that the error is usually due to:

  • Akka cannot send messages silently due to issues such as heavy load or serialization failures. In this case, you should find detailed error information in the logs.
  • Recipients take more time to respond due to issues such as slow machines or network jitter. In this case, you can try increasing akka.ask.timeout.
    After analysis, it is found that our traffic is about 1000 pieces/min, and the data traffic is very small, so it is unlikely to be a problem of high load, and besides these errors, no other errors have been found. So here we focus on adjusting the akka.ask.timeout parameter.
    We checked the parameter introduction on the official website to find the part about akka and found this parameter:

Solution

We found that the parameter defaults to 10s, because we use Flinkonk8s, so we directly configure the relevant parameters in the job:

akka.ask.timeout: 60 s

If it is another way, it can be modified in $FLINK_HOME/conf/flink-conf.yaml.
If you are using a version below Flink1.9, you need to modify the following parameters:

web.timeout="1000000"

Because in the flink1.8 version, it is found that the default web.timeout of flink is only 10s

We are looking at the flink1.9 version and found that the default web.timeout is only 600s

Summary:

When we use flink, we still need to understand the underlying principles.

Expand

An actor is a container for its own state and behavior. Its actor threads process incoming messages continuously. This relieves users from the error-prone task of lock and thread management, since only one thread is active per actor at a time. However, it must be guaranteed that an actor’s internal state is only accessed by the actor thread. The behavior of an actor is defined by a receive function, which contains a logic to be executed for each message received.

The Flink system consists of three distributed communication components: JobClient, JobManager, and TaskManager. The JobClient receives a Flink job from the user and submits it to the JobManager. The JobManager is then responsible for coordinating the execution of jobs. First, it allocates the required resources. This mainly includes running slots on TaskManagers. After resource allocation, the JobManager deploys each task of the job to each TaskManagers. Upon receipt of a task, the TaskManager creates a thread to execute the task. If there is a state change, such as starting a computation or completing a computation, it is sent back to the JobManager. The JobManager then controls job execution based on status updates until completion. Once the job completes, the results are sent to the JobClient, which notifies the user of the results of the run.

  • JobManager & TaskManager

    The JobManager is the central control unit responsible for executing a Flink job. As such, it manages resource allocation, task scheduling, and status reporting. Before any Flink job can be executed, a JobManager and at least one TaskManager must be started. The TaskManager then sends a RegisterTaskManager message to the JobManager to register itself. JobManager sends a confirmation message of successful registration. If the TaskManager is already registered with the JobManager, the JobManager returns an AlreadyRegistered message because multiple RegisterTaskManager messages were sent. If the registration is rejected, the JobManager will send a RefuseRegistration message.

    The JobClient submits a job to the JobManager by sending a SubmitJob message with the corresponding JobGraph attached. Upon receiving the JobGraph, the JobManager creates an ExecutionGraph based on the JobGraph, which is a logical representation of distributed execution. The ExecutionGraph contains information about the tasks that will be deployed to the TaskManagers for execution.

    The JobManager’s scheduler is responsible for allocating running slots among the available TaskManagers. After allocating an execution slot on a TaskManager, a SubmitTask message is sent to the TaskManager with all necessary information to execute the task. The TaskManager sends a TaskOperationResult message to confirm that the task deployment was successful. Once the source code of the submitted job is deployed and executed, the job submission is successful. The JobManager sends a Success message with the corresponding job ID to notify the JobClient that the job has been submitted successfully.

    Status updates for each task running on TaskManagers are sent back to the JobManager via the UpdateTaskExecutionState message. With these update messages, the ExecutionGraph can be updated to reflect the current state of the execution.

    The JobManager also acts as an input split assigner for the data source. It is responsible for assigning tasks to all TaskMangers in order to ensure data locality as much as possible. In order to dynamically balance the load, tasks request a new data split after processing the previous data split (input split). This request is implemented by sending a RequestNextInputSplit to the JobManager. The JobManager responds with a NextInputSplit message. If there are no more data shards, the data shard included in the JobManager return message is null.

    Tasks are lazily deployed on TaskManagers. This means that tasks that consume data are deployed after one of its data producers produces data. Once the producer is finished generating data, it sends a ScheduleOrUpdateConsumers message to the JobManager. This message indicates that the consumer (consumer) can now read the newly produced data. If the task consuming the data has not started yet, it will be deployed to a TaskManager.

  • JobClient

    JobClient represents a user-facing component in a distributed system. It is used to communicate with the JobManager and is responsible for submitting Flink jobs, querying the status of submitted jobs, and receiving status information about running jobs.

    JobClient is also an actor that communicates via messages. There are two messages related to job submission: SubmitJobDetached and SubmitJobWait. The first message submits a job and unregisters for receiving any status messages and final job results. The detached mode is useful if you want to submit your jobs to the Flink cluster in a fire and forget manner. The second type of message submits a job and registers to receive status messages for the job. Internally, this is achieved by creating a helper actor as the recipient of status messages. Once the job terminates, the JobManager sends a JobResultSuccess message to the helper actor with the elapsed time and the accumulated result. When receiving this message, the helper actor forwards the message to the JobClient that sent the SubmitJobWait message, and then terminates.

  • Asynchronous VS. Synchronous Messages

    Where possible, Flink tries to use asynchronous messages and handle responses as Futures. Futures and rarely existing blocking calls have a timeout after which operations are considered failed. This avoids deadlocks in the system if a message is lost or a distributed component crashes. However, if you happen to have a very large cluster or a slow network, timeouts may be triggered incorrectly. Therefore, the timeout of these operations can be modified in the configuration “akka.ask.timeout”.

    Before an actor can communicate with another actor, it must retrieve an ActorRef. The search for this operation also requires a timeout. The lookup timeout is set to a smaller value than the regular timeout in order to make the system fail fast if an actor is not started. In the case of lookup timeout, you can increase the lookup timeout in the configuration “akka.lookup.timeout”.

    Another feature of Akka is that it sets a limit on the maximum message size it can send. The reason is that it keeps a serialization buffer of the same size and it doesn’t want to waste memory. If you encounter a transmission error with a message exceeding the maximum value, you can increase the frame size (framesize) in the configuration “akka.framesize”.

  • Failure Detection

    Failure detection in a distributed system is important for its robustness. When running on a commodity cluster, a distributed system will always experience some component failure or unreachability. The reasons for such a failure are varied and can range from hardware failures to network outages. A robust (robust) distributed system should be able to detect failed components and restore it.

    Flink detects failed components through Akka’s DeathWatch mechanism. DeathWatch allows actors to monitor other actors even if they are not supervised by this actor or even if they belong to another actor system. Once a monitored actor dies or becomes unreachable, a terminate message is sent to the actor’s monitor. Therefore, upon receiving this message, the system can take appropriate action on this actor. Internally, DeathWatch is implemented as a heartbeat and a failure detector based on heartbeat interval, heartbeat pause, and heartbeat threshold to determine when an actor is likely to be dead. The heartbeat interval can be set in the configuration “akka.watch.heartbeat.interval”. Acceptable heartbeat pauses can be determined by configuring “akka.watch.heartbeat.pause”. The heartbeat pause should be several times the heartbeat interval, otherwise a missing heartbeat will directly trigger DeathWatch. The failure (heartbeat) threshold can be determined by configuring “akka.watch.threshold”, and it effectively controls the sensitivity of the failure detector. More details about DeathWatch mechanics and failure detectors can be found here.

    In Flink, the JobManager monitors all registered TaskManagers and all TaskManagers monitor the JobManager. This way, both types of components know when the other is unreachable. When a TaskManager is unreachable, the JobManager will mark the TaskManager that cannot deploy tasks as dead. In addition, the JobManager fails all tasks running on this TaskManager and reschedules them on another TaskManager. A TaskManager that is marked dead due to a temporary connection loss can re-register itself with the JobManager when the connection is re-established. TaskManager also monitors JobManager. This monitoring allows the TaskManager to enter a clean state by failing all running tasks when it detects that the JobManager has failed. Also, in case of death triggered only due to network congestion or loss of connection, the TaskManager will attempt to reconnect to the JobManager.

  • Future Development

    Currently only three components: JobClient, JobManager and TaskManager are implemented as actors. In order to better achieve concurrency and improve scalability, more components can be implemented as actors. A promising candidate is ExecutionGraph, whose ExecutionVertices or its associated Execution objects can also be implemented as an actor. Such a fine-grained actor model will facilitate state updates sent directly to the respective Execution objects. In this way, the JobManger will be significantly freed from being a single communication node.

  • Configuration

    akka.ask.timeout: timeout for all Futures and blocking Akka calls. If Flink fails due to timeouts, then you should increase this value. Timeouts can be caused by slow machines or congested networks. timeout value requires a time unit identifier (ms/s/min/h/d) (default: 10s)

    akka.lookup.timeout: timeout for JobManager lookup. The timeout value requires a time unit specifier (ms/s/min/h/d) (default: 10s ), and generally needs to be set to a value smaller than akka.ask.timeout.

    akka.framesize: The maximum size of messages sent between JobManager and TaskManager. If Flink fails due to message size exceeding this limit, then you should increase this value. The message size requires a message unit identifier. (default: 10485760b)

    akka.watch.heartbeat.interval: The time interval for Akka to detect dead TaskManager’s DeathWatch mechanism. You should increase this value if TaskManagers are incorrectly marked as dead due to missing or delayed heartbeat messages. A detailed introduction to Akka’s DeathWatch can be found here. (default: akka.ask.timeout/10)

    akka.watch.heartbeat.pause: Acceptable heartbeat pause value for Akka’s DeathWatch mechanism. A lower value does not allow an irregular heartbeat. A detailed introduction to Akka’s DeathWatch mechanism can be found here. (default: akka.ask.timeout)

    akka.watch.threshold: Threshold for DeathWatch failure detector. A lower value is prone to misjudgment, conversely, a larger value increases the time to detect a dead TaskManager. A detailed introduction to Akka’s DeathWatch mechanism can be found here. (default: 12)

Note: The parameters may change with the version, please refer to the official website.

Reference

[1] [Akka Series] Application of Akka and Actors in Flink
[2] [Akka Series] Akka and Actors