Research on “Exceptions” in Java Thread Pool Scheduling Periodic Tasks

Background

In the process of meeting performance monitoring requirements, there will be many monitoring tasks that need to be executed regularly. For example, in the implementation of lagging monitoring, we need to execute the main thread call stack task every 50ms. At this time, since the task itself may be time-consuming, we may need to make some adjustments to the next scheduling time of the task to avoid too big a difference. .
Here, taking Handler execution as an example, the final code implemented may look like this

public class MyHandler extends Handler {<!-- -->
    private long lastExecTime = System.nanoTime();
    private long delayTime = 50 * 1000000; // Convert to nanoseconds

    @Override
    public void handleMessage(Message msg) {<!-- -->
        long startTime = System.nanoTime();

        //Execute task
        doTask();

        // Calculate the delay time for the next task execution
        long timeDiff = System.nanoTime() - lastExecTime;
        long nextDelayTime = Math.max(delayTime - timeDiff, 0);

        //Record this execution time and delay time
        lastExecTime = System.nanoTime();
        delayTime = nextDelayTime;

        //Send next task message
        sendEmptyMessageDelayed(0, delayTime / 1000000); // Convert to milliseconds
    }

    private void doTask() {<!-- -->
        // Tasks that need to be performed
    }
}

Since Handler is usually mounted on a specified thread, if a Thread is created for each different scheduled task (such as Cpu collection, memory collection task), it will cause thread waste, and if the Handler of each scheduled task is attached to On the same thread Looper, it may be time-consuming to execute a certain task, which affects the scheduling of other tasks. Therefore, we hope that different collection tasks are isolated as much as possible.

At this time, you need to use the thread pool method to dynamically allocate or reuse threads. Java itself provides the ScheduledExecutorService? class, which supports the ability to delay task execution or scheduled task execution compared to ExecutorService.

public interface ScheduledExecutorService extends ExecutorService {<!-- -->
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
    
}

Finally, in the Apm project, I created a global ScheduledExecutorService object to support the timing execution requirements of various internal monitoring tasks. Here I finally used **scheduleAtFixedRate** for task registration and scheduling.

Question

After a certain performance collection function went online, I accidentally discovered that the data collected by some users was abnormal. I tracked the performance logs collected by the abnormal users and found that the time reported in these performance logs was abnormal. It was originally a task that was executed once every 10 seconds. However, judging from the collected logs, , executed hundreds of times within 1S (the probability of exceptions is low, about 1 device per day on average).
image.png
It can be confirmed from the log time that this is not a duplicate log, so it is only possible that the task scheduling interval is abnormal. After reviewing the relevant code, it is confirmed that the configured task interval parameter value cannot be a few millimeters.

After conducting several self-tests offline, the problem did not recur.

Source code analysis

The problem cannot be reproduced offline, so at this time we can only guess whether there is any problem with the scheduling logic inside the thread pool. Therefore, it is necessary to conduct an in-depth analysis of the code implementation of the thread pool’s execution of scheduled tasks. And here we just take advantage of this problem to learn how ScheduledThreadPoolExecutor implements delays and scheduled tasks.

First of all, in order to support the ability to delay task execution, a special Queue:DelayedWorkQueue is used in the ScheduledThreadPoolExecutor constructor.

 public ScheduledThreadPoolExecutor(int corePoolSize) {<!-- -->
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

DelayedWorkQueue is a Queue that supports priority sorting. In order to improve the performance when new tasks are added to the queue, the internal sorting is not linear and is stored in a way similar to a minimum heap.
When a new task is added to the queue, the DelayQueue.offer() method will first be called. The offer() function first makes some judgments about array expansion. If the array length is 0, it will be directly inserted into the head of the queue. , if it is not 0, call siftUp() . The siftUp() function will insert the element into the appropriate position based on the characteristics of the minimum heap. The way to determine the priority here is to directly call the compare() function

 public boolean offer(Runnable x) {<!-- -->
            if(x==null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {<!-- -->
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {<!-- -->
                    queue[0] = e;
                    setIndex(e, 0);
                } else {<!-- -->
                    siftUp(i, e);
                }
                if (queue[0] == e) {<!-- -->
                    leader = null;
                    available.signal();
                }
            } finally {<!-- -->
                lock.unlock();
            }
            return true;
        }
    \t
        private void siftUp(int k, RunnableScheduledFuture<?> key) {<!-- -->
                    while (k > 0) {<!-- -->
                        int parent = (k - 1) >>> 1;
                        RunnableScheduledFuture<?> e = queue[parent];
                        //Sort based on priority
                        if (key.compareTo(e) >= 0)
                            break;
                        //replace element
                        queue[k] = e;
                        setIndex(e, k);
                        k = parent;
                    }
                    queue[k] = key;
                    setIndex(key, k);
                }

The implementation of the compare() function is based on the time attribute of ScheduledFutureTask? for sorting. If the time is the same, the sorting is based on the enqueue time.

 public int compareTo(Delayed other) {<!-- -->
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {<!-- -->
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

Therefore, we next analyze how time is set for tasks scheduled by scheduleAtFixedRate.
When constructing the ScheduledFutureTask object, the initial execution time of the task is calculated through triggerTime(), and triggerTime() is internally
System.nanoTime() is the time baseline to calculate the execution time of the next task.

 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {<!-- -->
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0L)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                        null,
                        // Call triggerTime to set the next execution time of the task
                        triggerTime(initialDelay, unit),
                        unit.toNanos(period),
                        sequencer.getAndIncrement());
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

    /**
     * Returns the nanoTime-based trigger time of a delayed action.
     */
    long triggerTime(long delay) {<!-- -->
        return System.nanoTime() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

Next, we analyze the logic when the task is inserted into the queue until it is executed for the first time. First, when the task is executed, it will be judged whether the task is a periodic task. If it is not a periodic task, then The task ends after execution. If it is a periodically executed task, the runAndReset() function will be called.

 /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {<!-- -->
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic) //non-periodic task, call directly
                super.run();
            else if (super.runAndReset()) {<!-- --> //Periodic tasks, call runAndReset()
                //Update the time attribute of the task
                setNextRunTime();
                //Re-enter the queue and wait for the next dispatch
                reExecutePeriodic(outerTask);
            }
        }
protected boolean runAndReset() {<!-- -->
            if (state != NEW ||
                !RUNNER.compareAndSet(this, null, Thread.currentThread()))
                return false;
            boolean ran = false;
            int s = state;
            try {<!-- -->
                Callable<V> c = callable;
                if (c != null & amp; & amp; s == NEW) {<!-- -->
                    try {<!-- -->
                        c.call(); // don't set result
                        ran = true;
                    } catch (Throwable ex) {<!-- -->
                        setException(ex);
                    }
                }
            } finally {<!-- -->
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
            return ran & amp; & amp; s == NEW;
    }

Inside the runAndReset() function, the original scheduling task will be executed. The reset here is mainly to judge and reset some states. For example, if it is found that the current state is not NEW, it means that the task has been canceled and the original task will not be executed. In addition, since it is a periodic task, after the task is executed, the status of the Task will not be set to COMPLETING?
After the task execution is completed, setNextRuntime() will be called to reset the next execution time of the task and re-offer the task to the queue.

 /**
     * Sets the next time to run for a periodic task.
     */
    private void setNextRunTime() {<!-- -->
        long p = period;
        if (p > 0)
            time + = p;
        else
            time = triggerTime(-p);
    }

When period >0 here, the current time + period of the Task will be used to calculate the next task execution time.
When period <0, triigerTime(-p) is called to calculate the time. As mentioned above, the triggerTime() function internally calculates the time.
The current System.naonoTime() is calculated as the base time.
image.png
When you first see this, you will definitely wonder what happens if **period** is a negative number. After following the code here, I found that,
It turns out that the only difference between the implementation of the two functions scheduleAtFixedRate() and scheduleWithFIxedDelay() is
When scheduleWithFixedDelay, the incoming period will be set to a negative number. That is to say, the bottom layer uses the positive and negative period to determine which function the developer is calling. When encapsulating the general logic function, a parameter is omitted.
image.png
At this point in the code analysis, we understand the difference between these two functions in calculating the next scheduling time of the task:

  • scheduleAtFixedRate?(): Calculates the next execution time of the task. It is not calculated based on the current base time, but the value of the time set by the previous task.
  • scheduleWithFIxedDelay(): Calculate the next execution time of the task, which is calculated based on the current base time

At this point, let’s go back to the system’s definition of the shceudleAtFixRate function. In fact, it has been explained very clearly in the function comments.

Submits a periodic action that becomes enabled first after the given initial delay, and
subsequently with the given period; that is, executions will commence after initialDelay,
then initialDelay + period, then initialDelay + 2 * period, and so on.

image.png
The initial execution time of the task is based on the set delay time, and the time of each subsequent execution is also scheduled based on this initial time.
For example, if the initial time is 1000 and the cycle is 1000, then the execution time of subsequent tasks should normally be **2000 3000 4000**< strong>.
And shceduleAtFixedDelay is based on System.nanotime() recalculating the next scheduling time after the task execution is completed.
scheduleAtFixedRate()** **What happens if the initial time of the task is used as the calculation of the task scheduling time?

Problem recurrence

From the above implementation, we can find that for periodic tasks in the Java thread pool, the next task scheduling will depend on the end of the execution of the previous task. If the execution time of the task exceeds the interval set by the task,then subsequent The interval between task execution will become the execution time of the task as interval.

Here is a demo test written here, creating a task that takes 5S to execute, and setting the task execution interval to 2S.
image.pngOn this timestamp, you can find that the final task does not actually exist Execute in 2 seconds.

However, this case is different from the cases I encountered online. The tasks performed in the online scenario usually only take tens of milliseconds, and the execution interval is 10S. But we need to know that the online environment and the offline execution environment may be different. The offline task may only take tens of milliseconds, but online may the task not be executed at a certain time due to various abnormal conditions? Many timeouts? For example, due to insufficient time slice allocation, or other special circumstances, such as the GC suspending the thread? . If this is the case, theoretically, due to a certain timeout, the time attribute of the task will always be < System.nanoTime(), and then the execution time of subsequent tasks may return to normal. This causes the task to be scheduled indefinitely. Of course, after each scheduling time = time + period, it will not stop until the task’s time attribute value> System.nanoTime()!

In the following Demo, my default task scheduling time is 1S, and the time-consuming exception during the first execution of the simulation is 15S. It can be found that the subsequent fourteen task scheduling exceptions occurred, and the number of times ~= execution timeout time/task scheduling interval.
From another perspective, this ensures that**shceudleAtFixRate**the final number of executions is always equal to the time interval/task scheduling interval, and will not be due to a certain The execution of a task takes longer, resulting in fewer total times.
image.png

Solution

For my periodic performance sampling tasks, we can accept an abnormality in a certain running interval, but cannot accept an abnormal number of instantaneous task scheduling, because this may consume a lot of CPU or other resources. Therefore, the task scheduling is finally changed to use the scheduleAtFixDelay() function, and the actual interval during task execution is recorded. If the interval exceeds a certain threshold, you can choose to discard the results of this sampling based on the characteristics of the task. , or perform some calibration on the data results.