Kafka TimingWheel principle is worth learning from

In Kafka, there are many requests that do not return immediately, and return after processing some asynchronous operations or waiting for certain conditions to be met. These requests generally have a timeout parameter, indicating that if the server does not meet the return conditions after the timeout time, , it is determined that the request has timed out. At this time, kafka will also return a timeout response to the client, so that the client knows that the request has timed out. For example, a producer request with ack=-1 needs to wait for all isr backups to be completed before it can be returned to the client, or a timeout response is returned to the client when the timeout time is reached.

The above scenario can be implemented using delayed tasks. That is to say, define a task to be executed after the timeout time. The execution content is generally to first check whether the return condition is met. If it is met, it will return the response required by the client. If it is still not met, it will send a timeout response to the client.

For delayed operations, Java’s own implementations include Timer and ScheduledThreadPoolExecutor. The underlying data structure of both is based on a delay queue, and when a delayed task is ready to be executed, it is inserted into the delay queue. These delay queues are actually a priority queue implemented with a minimum heap. Therefore, the time complexity of inserting a task is O(logN), and the time of adjusting the heap after taking out a task is also O(logN).

If there are not many delayed tasks to be performed, O(logN) is fast enough. But for a high-throughput system like Kafka, the speed of O(logN) is not enough. In order to pursue faster speed, the designers of Kafka used the Timing Wheel data structure to make the insertion time complexity of the task reach O(1).

Timing Wheel

Picture

The above is a structural diagram of a time wheel. The time wheel has 8 slots, and the current time points to slot 0.

Let’s take another look at the data structure of TimingWheel in Kafka

private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {<!-- --></code>
<code> private[this] val interval = tickMs * wheelSize</code><code> private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }</code>
<code> private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs</code><code>}

tickMs: Indicates the time range represented by a slot, the default value of kafka is 1ms

wheelSize: Indicates how many slots the time wheel has. The default value of kafka is 20

startMs: Indicates the start time of this time wheel

taskCounter: Indicates the total number of tasks in this time round

queue: is a delay queue of TimerTaskList. Each slot has a corresponding TimerTaskList. The TimerTaskList is a doubly linked list with an expireTime value. These TimerTaskLists are added to the delay queue. The slot with the smallest expireTime will be at the front of the queue.

interval: The time span that the time wheel can represent, that is, tickMs*wheelSize

buckets: represents an array of TimerTaskList, that is, each slot.

currentTime: represents the current time, which is the time pointed by the time wheel pointer

Principle of operation

When adding a delayed task, use buckets[expiration / tickMs % wheelSize] to first calculate which slot it should belong to. For example, if the delayMs of the delayed task is 2ms, and the current time is 0ms, then expiration=delayMs + startMs=2ms, and it should fall into slot 2 based on the previous formula. And encapsulate the task into TimerTaskEntry and add it to the TimerTaskList linked list.

Afterwards, kafka will start a thread to push the pointer of the time wheel to rotate. The implementation principle is actually to take out the TimerTaskList placed in the front slot through queue.poll(). Since queue is a delay queue, if the expireTime in the queue does not arrive, the operation will block until expireTime arrives. If the TimerTaskList is obtained through queue.poll(), it means that the task time in the slot has arrived. At this time, you can traverse the tasks in the TimerTaskList and then perform the corresponding operations.

For the above example, there is a task in slot No. 2, so when the TimerTaskList of slot No. 2 is taken out, currentTime = timeMs - (timeMs % tickMs) will be used first, where timeMs is the expireTime of the TimerTaskList. , which is 2Ms. Therefore, at this time, currentTime=2ms, that is, the time wheel pointer points to 2Ms.

Time overflow processing

In the default implementation of kafka, tickMs=1Ms, wheelSize=20, which means that the delay time range that the time wheel can represent is 0~20Ms. So what should be done if the delay time exceeds 20Ms? Kafka has made a layer of improvements to the time wheel, turning the time wheel into a hierarchical time wheel.

At the beginning, the first layer of time wheels can represent the time range between 0 and 20Ms. Assume that the delay time of a task now is 200Ms, then Kafka will create another layer of time wheels, which we call the second layer of time. wheel.

The code for creating the second layer of time wheel is as follows

overflowWheel = new TimingWheel(</code><code> tickMs = interval,</code><code> wheelSize = wheelSize,</code><code> startMs = currentTime,</code><code> taskCounter = taskCounter,</code><code> queue</code><code>)

That is to say, the time that each slot of the second-layer time wheel can represent is the time range that can be represented by the first-layer time wheel, which is 20Ms. The number of slots is still the same, and other attributes are inherited from the first layer of time wheel. At this time, the time range that the second layer of time wheel can represent is 0~400Ms.

Afterwards, it is calculated through buckets[expiration / tickMs % wheelSize] that the task with a delay time of 200Ms should be located in slot 10 of the second-layer time wheel.

In the same way, if the time range of the second layer of time wheel cannot accommodate new delayed tasks, the third and fourth layers will be created…

It is worth noting that only when the current time wheel cannot accommodate the time that the target delay task can represent, you need to create a higher-level time wheel, or add the task to the higher-level time wheel (if the Time wheel has been created).

Some details

  1. When the pointer of the time wheel points to slot 1, that is, currentTime=1Ms, it means that the tasks in slot 0 have expired. At this time, slot 0 will be taken out for reuse and can accommodate tasks with a delay time of 20~21Ms. . In other words, if a 21Ms delayed task comes in when currentTime=0Ms, you need to create a higher-level time wheel. But if a 21Ms delayed task comes in when currentTime=1Ms, you can directly put it in slot 0. , when currentTime=21, the pointer points to slot 0 again

  2. Careful students may find that the task delay time range that slot 0 on the first layer can represent is 01Ms, and the corresponding expireTime of TimerTaskList is 0Ms. The task delay time range that slot lock No. 0 on the second layer can represent is 020Ms, and the corresponding expireTime of TimerTaskList is also 0Ms. Their TimerTaskList is placed in a delay queue. At this time, executing queue.poll() will take out both TimerTaskLists, and then when traversing the linked list, it will also determine whether the task has reached the execution time. If not, these tasks will still be Stuffed back into the wheel of time. At this time, due to the rotation of the first-level pointer, the tasks that were originally in the second-level time wheel may fall back to the first-level time wheel.

Source code analysis

Add new deferred task

//SystemTimer.scala </code><code>private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {<!-- --></code><code> if (!timingWheel.add(timerTaskEntry) ) {<!-- --></code><code> // Already expired or canceled</code><code> if (!timerTaskEntry.cancelled)</code><code> taskExecutor.submit(timerTaskEntry.timerTask )</code><code> }</code><code> }

Add new tasks to the Time Wheel

//TimingWheel</code><code>def add(timerTaskEntry: TimerTaskEntry): Boolean = {<!-- --></code><code> //Get the delay time of the task</code><code> val expiration = timerTaskEntry.expirationMs</code><code> //First determine whether the task has been completed</code><code> if (timerTaskEntry.cancelled) {<!-- --></code><code> false</code><code> //If the task has expired</code><code> } else if (expiration < currentTime + tickMs) {<!-- --></code><code> false</code><code> //Judge whether the time range represented by the current time wheel can accommodate the task</code><code> } else if (expiration < currentTime + interval) {<!-- --></code><code> // Calculate which slot it should be in based on the delay time of the task</code><code> val virtualId = expiration / tickMs</code><code> val bucket = buckets((virtualId % wheelSize.toLong). toInt)</code><code> bucket.add(timerTaskEntry)</code>
<code> //Set the expireTime of TimerTaskList</code><code> if (bucket.setExpiration(virtualId * tickMs)) {<!-- --></code><code> //Add TimerTaskList to the delay queue </code><code> queue.offer(bucket)</code><code> }</code><code> true</code><code> } else {<!-- --></code> <code> //If the time exceeds the maximum range that can currently be represented, create a new time wheel and add the task to that time wheel</code><code> if (overflowWheel == null) addOverflowWheel()</code><code> overflowWheel.add(timerTaskEntry)</code><code> }</code><code> }</code><code> private[this] def addOverflowWheel(): Unit = {<!- - --></code><code> synchronized {<!-- --></code><code> if (overflowWheel == null) {<!-- --></code><code> overflowWheel = new TimingWheel(</code><code> tickMs = interval,</code><code> wheelSize = wheelSize,</code><code> startMs = currentTime,</code><code> taskCounter = taskCounter,</code><code> code><code> queue</code><code> )</code><code> }</code><code> }</code><code> }

As can be seen from the above code, whether the current time wheel can accommodate the target task is calculated through expiration < currentTime + interval, that is, pushing back the interval time according to the pointer of the time wheel is the time The time range that the wheel can represent.

Advancement of the time wheel pointer

 //SystemTimer.scala def advanceClock(timeoutMs: Long): Boolean = { //Get the latest one from the delay queue Slot, if the expireTime of the slot has not arrived, this operation will block timeoutMs var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) {< !-- --> writeLock.lock() try { while (bucket != null) {< !-- --> //Advance the pointer of the time wheel timingWheel.advanceClock(bucket.getExpiration()) //Add the task of TimerTaskList Take them out and add them again. When adding, it will check whether the task has expired bucket.flush(reinsert) bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } }//TimingWheeldef advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { //Advance the pointer of the time wheel currentTime = timeMs - (timeMs % tickMs)
// Advance the pointer of the upper time wheel if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } }

Summary

Compared with the time complexity of the commonly used DelayQueue, which is O(logN), the data structure of TimingWheel only needs O(1) when inserting tasks, and the time complexity of obtaining arriving tasks is also much lower than O(logN). In addition, Kafka's TimingWheel will also check whether the task is completed before inserting it. For those scenarios where the specified operation is completed directly when the task times out, TimingWheel's performance is even better.

syntaxbug.com © 2021 All Rights Reserved.