Thread pool shutdown throws TimeoutException

problem description

Share a service error problem in the publishing process. The problem occurs every time the version is released and the service is ready to go offline. The location of the error is when the task submit is submitted to the thread pool. TimeoutException caused by using Future.get(), the error log will be Prints the following “error”. The pseudo code is as follows:

List<Future<Result<List<InfoVO>>>> futures = new ArrayList<>();
lists.forEach(item -> {
futures.add(enhanceExecutor.submit(() -> feignClient.getTimeList(ids)));
);
futures. forEach(
item -> {
try {
Result<List<InfoVO>> result = item. get(10, TimeUnit. SECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
log. error("error", e);
}
}
);

The code logic is very simple, that is, submit a Feign interface call to the thread pool for concurrent execution, and finally obtain the result synchronously through Feture.get(), waiting for up to 10s.
The configuration parameters of the thread pool are: the number of core threads is 16, the maximum number of threads is 32, the queue is 100, and the solution policy is CallerRunsPolicy, which means that when the thread cannot process the task, the task will be returned to the calling thread for execution.

Problem Analysis

The beginning of the problem analysis took some detours, because the most intuitive feeling of the Timeout exception is that the interface has timed out, and this interface does occasionally time out, so we used arthas to analyze the execution time of the interface and found that the interface is not slow. The above thread pool parameters basically do not timeout. At the same time, through monitoring on grafana, analyzing the qps and execution time of the interface, it can basically be ruled out that the interface is timed out.

Later, I began to wonder whether the other party’s service was also offline, because most of our services will be updated together, which will cause Feign to be abnormal. We also use resilience4j, which also has timeouts and thread pools. Could it be that it is in this scenario? The following problem occurs.
Here is another circle, after various google, github, chatgpt, no relevant information was found. This later also gave me a warning that before suspecting related components, you must first check your own code, and don’t drill in when you have no clue.

Later, combined with the timeline of the log, it was reorganized. The thread pool above is our own encapsulated thread pool, which supports monitoring, apollo dynamic modification of thread pool parameters, log tracking traceId printing, execution task statistics, service offline thread exit and other functions, which is very similar to the thread mentioned by the Meituan technical team Pool, but we package it based on our own needs, which is simpler and lighter to use.
In the article on service graceful offline, we wrote

Before the service is offline, the thread pool will respond to an event bus message, and then execute the shutdown method of the thread pool. The original intention is that when the service is offline, the thread pool will no longer receive new tasks and trigger the rejection strategy. Could that be the problem here?
Combined with the above code, when the thread pool is shut down, the CallerRunsPolicy policy is executed, and then submit should be blocked. This is what we usually understand. When the queue is full, continue to open the thread to the maximumPoolSize. If the number of threads has reached the maximumPoolSize and the queue is full, the solution strategy will be triggered at this time.
The following code is blocked when submitting for the third time, which is in line with the situation mentioned above.

 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 10, TimeUnit. MINUTES, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor. CallerRunsPolicy());
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});
//Block here
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});

So how did it shut down during the period? According to many introductions on the Internet, If the thread pool is shut down and the task is submitted again, the rejection strategy will be triggered. There is nothing wrong with this sentence itself, but it is not completely right. The pitfall is here. If you execute the following code, you will find that it is different from the above, the third submit will not be blocked.

 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 10, TimeUnit. MINUTES, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor. CallerRunsPolicy());
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});
        
        //add this line
        threadPoolExecutor. shutdown();

// won't block here...
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});

Why is this so? We traced the source code and found that it will indeed go to the rejection strategy, but there is a judgment in the CallerRunsPolicy rejection strategy. If the thread pool is not shutdown, the run method of Runnable is called directly, and the caller is used here. thread, so the caller thread will be blocked. If the thread pool is shutdown, nothing will be done, which is equivalent to the task being discarded.

According to this statement, if I use Future to receive the return value of submit at the end, and then call the Future.get method, what will happen?

 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 10, TimeUnit. MINUTES, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor. CallerRunsPolicy());
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});
threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});
        
        //add this line
threadPoolExecutor. shutdown();

// won't block here...
Future future = threadPoolExecutor. submit(() -> {
try {
Thread. sleep(5000);
} catch (InterruptedException e) {
}
});
        
        // what will happen here?
future.get(10, TimeUnit.SECONDS);

The result is a timeout, and a TimeoutException is reported, as shown below:

Our problem can be reproduced, but why does future.get time out? Under normal circumstances, it implements blocking the calling thread, and how to return to execute when the thread gets the execution result, which requires us to understand the principle of Future.

Future principle

Future literally means the future, which is very semantic. When we use asynchronous task execution, we can use Future if we want to know whether the task execution is completed, obtain the task execution result, or even cancel the task execution at some point in the future.
Future is an interface, and FutureTask is an implementation class of it, which implements the Future and Runnable interfaces at the same time, that is to say, FutureTask can be executed as Runnable, and the result can also be obtained through it.
The source code of ThreadPollExecutor.submit is as follows, newTaskFor is to create a FutureTask.

If the task has not been executed after submission, let’s see how it achieves blocking. The source code of the get() method with timeout is as follows:

If it is judged in the code that state > COMPLETING, report is called directly, that is, it returns directly. state is a private member traversal, it may have the following values, greater than 1 means that the final state of the task is returned directly.

Otherwise, enter the awaitDone() method, the code is as follows:

This method is an unconditional for loop, but it does not consume cpu to continuously check a certain state to obtain the result, which is too inefficient.
According to the “normal” call (we only consider the simplest scenario, don’t be disturbed by some abnormal or unimportant branches, so as not to get deeper and deeper), this for loop will enter 3 times, which are the 3 positions of the break point in the above figure .
The first position will create a WaitNode node, WaitNode protects a Thread and a next, obviously it will form a linked list.

The second position will try to use CAS to add this node to the head of the linked list. If the addition fails, the for loop will continue until the addition is successful. If the addition is successful, it will enter the third breakpoint position.
The third position will call LockSupport.parkNanos(this, nanos) to block the current thread.

Why is it a linked list here? The reason is very simple. After we submit the task, we can wait for the result of this task in multiple threads, that is, call the get() method in multiple threads, then each time a WaitNode will be created and a linked list will be formed .

Ok, knowing how Future.get() achieves blocking, let’s see how it recovers and gets the result when the task is executed.
Going back to the submit method of the thread pool above, the FutureTask is passed as a Runnable to the thread pool execute, and its run() method will eventually be executed.
We still mainly look at the “normal” execution process. After execution, we will go to the set method and do two things:
1. Set the state to NOMAL, indicating that the task has been executed normally.

2. Execute the finishCompletion method, traverse all nodes in the waiters list, each node corresponds to a thread, take out the thread, and execute LockSupport.unpark(t) to resume thread execution.

Summary

Through source code analysis, we know that when the thread calling Future.get() is blocked, its recovery is recovered by FutureTask.run(), that is, the task we submitted is recovered after being executed.
When our thread is shutdown, the submit task will indeed trigger the rejection policy, but CallerRunsPolicy will judge whether the thread pool status is shutdown, if not, directly call the Runnable.run() method, which is equivalent to executing in the calling thread. If it is in the shutdown state, do nothing, the problem lies here, we have to rely on its execution to restore the blockage, and now if we do nothing, we cannot recover. The same DiscardPolicy and DiscardOldestPolicy will also have this problem. AbortPolicy directly throws an exception, and the calling thread throws an exception when submitting, and the Future.get() method cannot be reached.
But why does java do this? The original intention of this rejection policy is to use the caller thread to execute, but in this case the task is discarded. I read the source code of jdk17, this logic has not changed, that is to say, it is reasonable.
The thread pool is closed. When the thread pool has been shut down, it means that it can no longer receive new tasks. If it is shut down, it still uses the calling thread to execute. In fact, it is still receiving new tasks in essence. This violates the thread pool’s regulation that it will no longer receive new tasks after shutdown. The semantics of the new task.
In short, you need to pay attention to this problem when using shutdown. For example, our scenario should be to trigger the service to go offline and wait for the requests to be processed before shutdown, instead of shutdown at the beginning, so that some requests are still being processed. question. Or to ensure that the service is offline and wait for the tasks in the event to be processed, do not shut down at all, and let the caller ensure this by themselves, and the error report will no longer appear after processing.