Bloody lessons – stepping on the trap of timing thread pool

ScheduledExecutorService

1. Background

Hello everyone, last week our company had a failure due to improper use of the timing thread pool. Tens of millions of orders may have been lost.

Share this pit with brothers, I hope brothers will not step on it in the future!

A large number of scheduled thread pools (ScheduledExecutorService) are used in the business to perform tasks, and sometimes the exception judgment of Try/Catch is ignored.

When an error is reported during task execution, it will cause the entire scheduled thread pool to hang up, affecting the normal needs of the business.

2. Questions

Let’s imitate a production example:

  • The frequency of modifications by partners is low and partners allow eventual consistency

  • We have a scheduled task that goes to MySQL every 60 seconds to pull all the partner data and put it into the partner cache (local cache) ) in

  • When the customer requests, we can get the partner from the cache

Such production examples should exist in most companies, and the code is as follows:

public class Demo {

    //Create a scheduled thread pool
    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

    private List<String> partnerCache = new ArrayList<>();

    @PostConstruct
    public void init() {
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                // Refresh the database every 60 seconds during startup
                //Cache data locally
                loadPartner();
            }
        }, 3, 60, TimeUnit.SECONDS);
    }

    public void loadPartner() {
        //Query the latest partner data in the database
        List<String> partnerList = queryPartners();

        // Put partner data into cache
        partnerCache.clear();
        partnerCache.addAll(partnerList);
    }


    public List<String> queryPartners() {
        //The database is down!
        throw new RuntimeException();
    }

}

Running the above example, we will find that the program does not stop, outputs Load start! once, and keeps running, but does not output Load start! subsequently.

At this time we can confirm: The exception indeed causes the current task to no longer be executed

1. Why does task error reporting affect the timing thread pool?

2. Is the scheduled thread pool really dead?

3. How is the timing thread pool executed internally?

Following these three questions, let’s take a look at the principle introduction of ScheduledExecutorService

3. Principle analysis

For ScheduledExecutorService, it is essentially delay queue + thread pool

1. Introduction to delay queue

DelayQueue is an unbounded BlockingQueue, which is used to place objects that implement the Delayed interface and can only be removed from the queue when they expire.

This kind of queue is ordered, that is, the head object has the longest delayed expiration time.

Let’s take a look at the properties of the objects in Delay Queue:


class MyDelayedTask implements Delayed{
    //Current task creation time
    private long start = System.currentTimeMillis();
    //delay time
    private long time;

    // initialization
    public MyDelayedTask(long time) {
        this.time = time;
    }

    /**
     * The interface that needs to be implemented to obtain the delay time (use expiration time-current time)
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start + time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    /**
     * Used to delay queue internal comparison sorting (delay time of current time - delay time of comparison object)
     */
    @Override
    public int compareTo(Delayed o) {
        MyDelayedTask o1 = (MyDelayedTask) o;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

Therefore, the implementation principle of delay queue is also very simple:

  • Production side: Add timestamp when delivering message (Current time + delay time)
  • Consumer side: Compare the current time with the timestamp. If it is less than the timestamp, it will be consumed. Otherwise, it will wait in a loop.

2. Introduction to the principle of thread pool

  • The current number of thread pools is lower than the number of core threads. Just add core threads directly.
  • The current number of thread pools is greater than the number of core threads, and the task is added to the blocking queue.
  • If adding a blocking queue fails, you need to add a non-core thread number processing task
  • If adding the number of non-core threads fails (full), execute the rejection policy

3. The principle of timing threads

Let’s look at the creation of the scheduled thread pool: scheduledExecutorService.scheduleAtFixedRate(myTask, 3L, 1L, TimeUnit.SECONDS);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    //Initialize our task
    // triggerTime: implementation of delay
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // Throw the current task into the delay queue
    super.getQueue().add(task);
    //Create the core thread and start it
    ensurePrestart();
}

// time wheel algorithm
private long triggerTime(long delay, TimeUnit unit) {
    return now() + delay;
}

From here we can draw the conclusion: The timing thread pool achieves timing purposes through delay queues

There is a question: We only put a task into Queue. How does it ensure that it is executed multiple times?

With this question in mind, let’s take a look at the code for starting the pull task:

for (;;) {
    // Get tasks from the delay queue
    Runnable r = workQueue.take();
}
public RunnableScheduledFuture<?> take(){
    for (;;) {
        // Get the first task in the queue
        RunnableScheduledFuture<?> first = queue[0];
        
        // [Important] If the current queue task is empty, wait
        if (first == null){
            available.await();
        }
                        
        // Get the time of the current task
        long delay = first.getDelay(NANOSECONDS);
        
        if (delay <= 0){
            // Pop up the current task
            return finishPoll(first);
        }
        
                            
    }
}
// timestamp minus current time
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

After getting the task (ScheduledFutureTask), the task will be executed: task.run()

public void run() {
   //Execute current task
   if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

protected boolean runAndReset() {
    if (state != NEW){
        return false;
    }
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null & amp; & amp; s == NEW) {
            try {
                //Execute task
                c.call();
                // [Key point! ! ! 】If the task is successfully executed normally, ran will be set to true here.
                // If there is a problem with your task, it will be caught directly below, and the ran here will not be set to true.
                ran = true;
            } catch (Throwable ex) {
                // If an exception occurs, the state will be set to EXCEPTIONAL
                // Mark the current task execution failure and assign the exception value to the result
                setException(ex);
            }finally {
                 s = state;
            }
        }
    }
    // ran: whether the current task was executed successfully
    // s: current task status
    // ran is false: the current task execution failed
    // s == NEW = false: An exception occurs in the current task status
    return ran & amp; & amp; s == NEW;
}

If our runAndReset returns false, then the setNextRunTime method cannot be entered:

if (ScheduledFutureTask.super.runAndReset()) {
    //Modify the Time of the current task
    setNextRunTime();
    // Put the task back into the queue
    reExecutePeriodic(outerTask);
}

In the end, there is no way for the task to be thrown into the queue, and our thread cannot get the task execution and keeps waiting.

4. Conclusion

Through the above analysis, let’s look back at the three questions at the beginning:

1. Why does task error reporting affect the timing thread pool?

  • Task error reporting will not affect the thread pool, but the thread pool will lose the current task and will not continue to put it in the queue.

2. Is the scheduled thread pool really dead?

  • The scheduled thread pool is not suspended, only the tasks that report errors are suspended.

3. How is the timing thread pool executed internally?

  • Thread pool + delay queue

Therefore, through the above explanation, we should realize: Try Catch must be added to scheduled tasks, otherwise once an exception occurs

Otherwise, you will be like the author, suffering a failure that cost the company tens of millions, a bloody lesson!

It is also said in Alibaba’s development documents:

The experience summed up by the predecessors is really a bloody lesson, we must listen to the advice [crying].

Friends who need the book version of Alibaba’s development documents click here to get the first information

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java Skill TreeHomepageOverview 138,703 people are learning the system