Overview
I recently wrote a batch business interface. In order to reduce the response time, I used a custom thread pool + CompletableFuture to execute tasks asynchronously and block the results.
But during the test, I found a very strange problem
Let’s create a test case to reproduce the problem, and it’s also convenient for everyone to understand~
Problem case
@SpringBootTest public classOtherTest { ? @Resource private ThreadPoolExecutor threadPoolExecutor; ? @Test public voidtest() { List<Integer> list = new ArrayList<>(100); for (int i = 0; i < 100; i ++ ) { list. add(i); } ? List<CompletableFuture<Integer>> futureList = list. stream(). map(num -> { return CompletableFuture. supplyAsync(() -> { // Simulate business return num + 1; }, threadPoolExecutor); }).collect(Collectors.toList()); ? CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); ? for (CompletableFuture<Integer> completableFuture : futureList) { try { System.out.println(completableFuture.get()); } catch (Exception e) { e.printStackTrace(); } } } ? } Copy Code
After executing the case, we will find that the main thread has been blocked, and the following printing is not executed at all! This is the problem I’m having
Why is it blocked?
We directly Dump Threads, we can see that the main thread is in the WAITING state, and through the stack trace, we can see that it was locked by LockSupport.park
Because we will eventually CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); block the result
Track all the way according to the join method, and finally reach java.util.concurrent.CompletableFuture.Signaller#block, which also conforms to the stack information in the above figure
ok, the problem is found, but why does it keep blocking?
According to our previous running results, we can find that the error log that the task was rejected was printed out. Could it be because of the rejection? ? ?
Thinking of this, I directly increased the thread and queue capacity
Here are the previous thread pool configuration parameters
@Slf4j@ConfigurationpublicclassThreadPoolConfig { ? @Bean(name = "threadPoolExecutor") publicThreadPoolExecutor prepareThreadPoolExecutor() { return newThreadPoolExecutor(2, 2, 1, TimeUnit. SECONDS, newLinkedBlockingQueue<>(5), newRejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("ThreadPoolExecutor task is rejected, please check in time!!!"); } }); } ? } Copy Code
The following are the adjusted thread pool parameters, increasing the queue capacity and the maximum number of threads
@Slf4j@ConfigurationpublicclassThreadPoolConfig { ? @Bean(name = "threadPoolExecutor") publicThreadPoolExecutor prepareThreadPoolExecutor() { return newThreadPoolExecutor(2, 10, 1, TimeUnit. SECONDS, newLinkedBlockingQueue<>(500), newRejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("ThreadPoolExecutor task is rejected, please check in time!!!"); } }); } ? } Copy Code
Execute the task again after adjustment, you can see that the execution is successful
Look at the source code, find problems~
Through the above, we have learned that the main thread LockSupport.park is always in the WAITING state because the task is rejected, and cannot continue to execute.
Since it is parked, it must be unparked
So, when will it unpark?
Of course, after the tasks are all executed, go back to unpark the main thread
But before we saw through the logs that some tasks were rejected, and the tasks would not be executed if they were rejected. Then the thread that blocked the acquisition of the task would not be unparked, and subsequent tasks would not be executed, so it could not be unparked. The main thread, so the main thread has been blocked.
Let’s take a look at the source code of the thread pool:
public void execute(Runnable command) { if (command == null) throw new nullPointerException(); ? int c = ctl. get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl. get(); } if (isRunning(c) & amp; & amp; workQueue. offer(command)) { int recheck = ctl. get(); if (! isRunning(recheck) & amp; & amp; remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } Copy Code
I believe everyone knows the stereotype of the thread pool. Looking at the source code above, we know that when the number of threads is not less than the number of core threads and the queue capacity is full, we will try addWorker again.
// At this point core is falseprivate boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl. get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) & amp; & amp; (runStateAtLeast(c, STOP) || firstTask != null || workQueue. isEmpty())) return false; ? for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // The current number of threads exceeds the number of core threads, directly return false to trigger the rejection strategy return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // ...omitted return workerStarted; } Copy Code
If the current number of threads in addWorker exceeds the number of core threads, return false directly to trigger the rejection strategy, and will not add tasks to the queue
CompletableFuture doesn’t work, what about Future?
Some friends may say that since CompletableFuture does not work, then I use Future.
In fact, the principle is the same, even Future
@SpringBootTest public classOtherTest { ? @Resource private ThreadPoolExecutor threadPoolExecutor; ? @Test public voidtest() { List<Integer> list = new ArrayList<>(100); for (int i = 0; i < 100; i ++ ) { list. add(i); } ? List<Future<Integer>> futureList = list. stream(). map(num -> { return threadPoolExecutor. submit(() -> { return num + 1; }); }).collect(Collectors.toList()); ? for (Future<Integer> future : futureList) { try { System.out.println(future.get()); } catch (Exception e) { e.printStackTrace(); } } ? } ? } Copy Code
Using Future will also cause the main thread to park and be in the WAITING state
The following is the get source code of FutureTask, which will be blocked by LockSupport.park(this);
For detailed explanation, see ->
Asynchronous timeout interrupt, know it, but also know why~
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } ? private int awaitDone(boolean timed, long nanos) throws InterruptedException { long startTime = 0L; WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { if (q != null) q. thread = null; return s; } else if (s == COMPLETING) Thread. yield(); else if (Thread. interrupted()) { removeWaiter(q); throw new InterruptedException(); } else if (q == null) { if (timed & amp; & amp; nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) queued = WAITERS. weakCompareAndSet(this, q. next = waiters, q); else if (timed) { final long park Nanos; if (startTime == 0L) { startTime = System. nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } if (state < COMPLETING) LockSupport. parkNanos(this, parkNanos); } else LockSupport. park(this); } } Copy Code
However, the unpark timing is finishCompletion after the normal execution of the task, this method will wake up all threads that are blocked to obtain the task
But the task was rejected, so it was impossible to wake up the thread that blocked the acquisition of the task, which caused a tragedy~
CompletableFuture is actually the reason
In essence, it is because the task is rejected and cannot be executed, resulting in the inability to wake up the thread
public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null & amp; & amp; state == NEW) { V result; boolean ran; try { result = c. call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // ...omitted } } ? ? protected void set(V v) { if (STATE. compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE. setRelease(this, NORMAL); // final state // Wake up the thread that is blocked to get the task finishCompletion(); } } Copy Code
Solution
Now that you understand the cause of the pit, it is not difficult to find the corresponding solution
-
Remember to throw an exception in the rejection policy to avoid the main thread waiting all the time. For example, DiscardPolicy and DiscardOldestPolicy will have certain problems, and no exception is thrown externally.
-
Set the timeout time and release resources in time. For example, CompletableFuture can set the timeout time through orTimeout after jdk9. For Future, its get method also supports setting the waiting time.
Author: Code Pipi Shrimp
Link: https://juejin.cn/post/7214124426047750203