[Pit]: Who taught you that your custom rejection strategy does not throw exceptions?

Overview

I recently wrote a batch business interface. In order to reduce the response time, I used a custom thread pool + CompletableFuture to execute tasks asynchronously and block the results.

But during the test, I found a very strange problem

Let’s create a test case to reproduce the problem, and it’s also convenient for everyone to understand~

Problem case

@SpringBootTest
public classOtherTest {
?
   @Resource
   private ThreadPoolExecutor threadPoolExecutor;
?
   @Test
   public voidtest() {
      List<Integer> list = new ArrayList<>(100);
      for (int i = 0; i < 100; i ++ ) {
         list. add(i);
      }
?
      List<CompletableFuture<Integer>> futureList = list. stream(). map(num -> {
         return CompletableFuture. supplyAsync(() -> {
            // Simulate business
            return num + 1;
         }, threadPoolExecutor);
      }).collect(Collectors.toList());
?
      CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
?
      for (CompletableFuture<Integer> completableFuture : futureList) {
         try {
            System.out.println(completableFuture.get());
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
   }
?
}
Copy Code

After executing the case, we will find that the main thread has been blocked, and the following printing is not executed at all! This is the problem I’m having

Why is it blocked?

We directly Dump Threads, we can see that the main thread is in the WAITING state, and through the stack trace, we can see that it was locked by LockSupport.park

Because we will eventually CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); block the result

Track all the way according to the join method, and finally reach java.util.concurrent.CompletableFuture.Signaller#block, which also conforms to the stack information in the above figure

ok, the problem is found, but why does it keep blocking?

According to our previous running results, we can find that the error log that the task was rejected was printed out. Could it be because of the rejection? ? ?

Thinking of this, I directly increased the thread and queue capacity

Here are the previous thread pool configuration parameters

@Slf4j@ConfigurationpublicclassThreadPoolConfig {
?
   @Bean(name = "threadPoolExecutor")
   publicThreadPoolExecutor prepareThreadPoolExecutor() {
      return newThreadPoolExecutor(2, 2, 1, TimeUnit. SECONDS,
            newLinkedBlockingQueue<>(5), newRejectedExecutionHandler() {
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.error("ThreadPoolExecutor task is rejected, please check in time!!!");
         }
      });
   }
?
}
Copy Code

The following are the adjusted thread pool parameters, increasing the queue capacity and the maximum number of threads

@Slf4j@ConfigurationpublicclassThreadPoolConfig {
?
   @Bean(name = "threadPoolExecutor")
   publicThreadPoolExecutor prepareThreadPoolExecutor() {
      return newThreadPoolExecutor(2, 10, 1, TimeUnit. SECONDS,
            newLinkedBlockingQueue<>(500), newRejectedExecutionHandler() {
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.error("ThreadPoolExecutor task is rejected, please check in time!!!");
         }
      });
   }
?
}
Copy Code

Execute the task again after adjustment, you can see that the execution is successful

Look at the source code, find problems~

Through the above, we have learned that the main thread LockSupport.park is always in the WAITING state because the task is rejected, and cannot continue to execute.

Since it is parked, it must be unparked

So, when will it unpark?

Of course, after the tasks are all executed, go back to unpark the main thread

But before we saw through the logs that some tasks were rejected, and the tasks would not be executed if they were rejected. Then the thread that blocked the acquisition of the task would not be unparked, and subsequent tasks would not be executed, so it could not be unparked. The main thread, so the main thread has been blocked.

Let’s take a look at the source code of the thread pool:

public void execute(Runnable command) {
    if (command == null)
        throw new nullPointerException();
?
    int c = ctl. get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl. get();
    }
    if (isRunning(c) & amp; & amp; workQueue. offer(command)) {
        int recheck = ctl. get();
        if (! isRunning(recheck) & amp; & amp; remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
Copy Code

I believe everyone knows the stereotype of the thread pool. Looking at the source code above, we know that when the number of threads is not less than the number of core threads and the queue capacity is full, we will try addWorker again.

// At this point core is falseprivate boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl. get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
             & amp; & amp; (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue. isEmpty()))
            return false;
?
        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                // The current number of threads exceeds the number of core threads, directly return false to trigger the rejection strategy
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get(); // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // ...omitted
    return workerStarted;
}
Copy Code

If the current number of threads in addWorker exceeds the number of core threads, return false directly to trigger the rejection strategy, and will not add tasks to the queue

CompletableFuture doesn’t work, what about Future?

Some friends may say that since CompletableFuture does not work, then I use Future.

In fact, the principle is the same, even Future

@SpringBootTest
public classOtherTest {
?
   @Resource
   private ThreadPoolExecutor threadPoolExecutor;
?
   @Test
   public voidtest() {
      List<Integer> list = new ArrayList<>(100);
      for (int i = 0; i < 100; i ++ ) {
         list. add(i);
      }
?
      List<Future<Integer>> futureList = list. stream(). map(num -> {
         return threadPoolExecutor. submit(() -> {
            return num + 1;
         });
      }).collect(Collectors.toList());
?
      for (Future<Integer> future : futureList) {
         try {
            System.out.println(future.get());
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
?
   }
?
}
Copy Code

Using Future will also cause the main thread to park and be in the WAITING state

The following is the get source code of FutureTask, which will be blocked by LockSupport.park(this);

For detailed explanation, see ->
Asynchronous timeout interrupt, know it, but also know why~

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
?
private int awaitDone(boolean timed, long nanos)
  throws InterruptedException {
  long startTime = 0L;
  WaitNode q = null;
  boolean queued = false;
  for (;;) {
    int s = state;
    if (s > COMPLETING) {
      if (q != null)
        q. thread = null;
      return s;
    }
    else if (s == COMPLETING)
      Thread. yield();
    else if (Thread. interrupted()) {
      removeWaiter(q);
      throw new InterruptedException();
    }
    else if (q == null) {
      if (timed & amp; & amp; nanos <= 0L)
        return s;
      q = new WaitNode();
    }
    else if (!queued)
      queued = WAITERS. weakCompareAndSet(this, q. next = waiters, q);
    else if (timed) {
      final long park Nanos;
      if (startTime == 0L) {
        startTime = System. nanoTime();
        if (startTime == 0L)
          startTime = 1L;
        parkNanos = nanos;
      } else {
        long elapsed = System.nanoTime() - startTime;
        if (elapsed >= nanos) {
          removeWaiter(q);
          return state;
        }
        parkNanos = nanos - elapsed;
      }
      if (state < COMPLETING)
        LockSupport. parkNanos(this, parkNanos);
    }
    else
      LockSupport. park(this);
  }
}
Copy Code

However, the unpark timing is finishCompletion after the normal execution of the task, this method will wake up all threads that are blocked to obtain the task

But the task was rejected, so it was impossible to wake up the thread that blocked the acquisition of the task, which caused a tragedy~

CompletableFuture is actually the reason

In essence, it is because the task is rejected and cannot be executed, resulting in the inability to wake up the thread

public void run() {
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null & amp; & amp; state == NEW) {
            V result;
            boolean ran;
            try {
                result = c. call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
      // ...omitted
    }
}
?
?
protected void set(V v) {
  if (STATE. compareAndSet(this, NEW, COMPLETING)) {
    outcome = v;
    STATE. setRelease(this, NORMAL); // final state
    // Wake up the thread that is blocked to get the task
    finishCompletion();
  }
}
Copy Code

Solution

Now that you understand the cause of the pit, it is not difficult to find the corresponding solution

  1. Remember to throw an exception in the rejection policy to avoid the main thread waiting all the time. For example, DiscardPolicy and DiscardOldestPolicy will have certain problems, and no exception is thrown externally.

  1. Set the timeout time and release resources in time. For example, CompletableFuture can set the timeout time through orTimeout after jdk9. For Future, its get method also supports setting the waiting time.

Author: Code Pipi Shrimp

Link: https://juejin.cn/post/7214124426047750203