How does the Java thread pool ensure that core threads are not destroyed?

Source: How does the Java thread pool ensure that core threads are not destroyed_Chao Hua Shi Xi’s blog-CSDN blog

For Thread objects in Java, after the same thread object calls the start method, it will move to the TERMINATED state after executing the run. That is to say, a thread object cannot repeatedly execute the run method content by calling the start method multiple times.

Details can be found through this link: Can the same thread object in Java call the start method multiple times?

Question: How does the Java thread pool ensure that the core thread will not terminate?

Next, we will analyze how the thread pool ensures that the core thread is not terminated through the source code. Before the analysis, you need to understand several important member variables and methods in ThreadPoolExecutor to facilitate reading of the following source code:

Introduction to ThreadPoolExecutor member variables and methods

  1. ctl atomic integer variable
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  • ctl contains two fields
    • workerCount: Indicates the actual number of threads in the thread pool;
    • runState: Indicates the running status of the thread pool (note that it is distinguished from the thread status). The thread pool status includes the following:
      • RUNNING: Can receive new tasks and execute tasks in the queue;
      • SHUTDOWN: Does not accept new tasks, but can execute tasks in the queue;
      • STOP: Do not receive new tasks, do not execute queue tasks and interrupt ongoing tasks;
      • TIDYING: All tasks have been terminated, workerCount is 0, and the hook method terminated will be run;
      • TERMINATED: The status of terminated method call completion.
  1. The maximum capacity of actual effective threads in the thread pool
//Integer.SIZE equals 32
private static final int COUNT_BITS = Integer.SIZE - 3;
//The maximum capacity of actual effective threads
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  • The actual maximum number of threads in effect is
    2 29 ? 1 2^{29} – 1229?1
  • The reason why the int type is used is because it runs faster than long. If int is not enough in the future, you can use AtomicLong instead.
  1. Thread pool running status
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

Here we only need to know that only the RUNNING state is less than 0, and all other states are greater than or equal to 0.
4. Get the thread pool running status

private static int runStateOf(int c) { return c & amp; ~CAPACITY; }

The method input parameter c is ctl. Through the bit operation of this method, the status value of the thread pool can be obtained, and compared with the status in 3 for logical processing.
5. Get the current actual number of effective threads in the thread pool

private static int workerCountOf(int c) { return c & amp; CAPACITY; }

Same as above, the method input parameter c is the ctl atomic integer variable, and the actual number of threads in the thread pool, workCount, is obtained through bit operations.
6. Worker thread collection

private final HashSet<Worker> workers = new HashSet<Worker>();

Each valid thread in the thread pool will be packaged as a Worker object.
7. Worker inner class

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
  • The Worker class is mainly used to maintain the interrupt control status of the running task thread;
  • Inheriting AQS implements a simple non-reentrant mutex lock instead of using a reentrant lock, because we do not want the worker task to be able to reacquire the lock when calling a pool control method such as setCorePoolSize;
  • To disable interrupts before the thread actually starts running the task, initialize the lock status to a negative value and clear it on startup (in the runWorker).
  1. Others such as corePoolSize, maximumPoolSize, threadFactory, workQueue, etc. will not be described in detail.

Case analysis

This case does not execute the shutdown method, which ensures that the thread pool is always running (RUNNING)

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2,
            0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(6),
            Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    for (int i = 0; i < 8; i + + ) {
        int num = i;
        threadPoolExecutor.execute(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " - " + num);
            System.out.println(threadName + "Start sleeping...");
            try {
                //suspend thread execution
                TimeUnit.MILLISECONDS.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadName + "End sleep...");
        });
    }

    //threadPoolExecutor.shutdown();
}

Trace the source code of the execute method to see how the core thread is added to the pool:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //Get thread pool control status
    int c = ctl.get();
    //Calculate the actual number of threads through workerCountOf
    if (workerCountOf(c) < corePoolSize) {
        //If the number of core threads is not exceeded, add a new Worker object, true means core threads
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    //The core thread is full. If the thread pool is running, add tasks to the queue.
    if (isRunning(c) & amp; & amp; workQueue.offer(command)) {
        int recheck = ctl.get();
        //Double detection of whether the pool is running
        if (! isRunning(recheck) & amp; & amp; remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))//Add non-core thread
        reject(command);
}

Based on the method content and breakpoint tracking, the following conclusions can be drawn:

  1. The number of core threads does not exceed corePoolSize. Every time a new task (command) is added, a new thread (created in Worker) will be created, even if there are idle threads;
  2. After the number of core threads is equal to corePoolSize, if you continue to add new tasks (command), the tasks will be added to the blocking queue workQueue and wait for scheduling;
  3. If adding to the queue fails, check whether corePoolSize is less than maximumPoolSize. If it is less, create a new thread to perform tasks until the total number of threads is equal to maximumPoolSize;
  4. When the number of threads is equal to maximumPoolSize and the queue is full, subsequent new tasks will trigger the thread saturation policy.

In the above code, we are concerned about the addWorker method. It has two parameters. The first is the Runnable object. The second parameter is to mark whether it is a core thread. true means core thread. Next, look at the source code:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // Omit some code
        ...
        
        for (;;) {
            //core is mainly used to determine whether to continue creating new threads
            int wc = workerCountOf(c);
            //workCount is greater than the total capacity or workCount is greater than the core thread or the maximum thread will be returned directly
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //Increase c by 1 through CAS, that is, add 1 to workCount
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get(); // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
             retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //Create new thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            //Omit some code
            ...
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
            workerAdded = true;
            ...
            
            if (workerAdded) {
                //Start thread
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

From the addWorker method, you can see that the thread object t is obtained from the Worker object, and the start method is called to start the thread. So how does this t thread come from?
Extension: java retry: detailed explanation

Next, let’s take a look at how Worker creates threads:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    /**Initial execution task, may be empty*/
    Runnable firstTask;
    
    /**Created Worker object using firstTask and thread from thread factory*/
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /**Delegate the run method to runWorker for execution*/
    public void run() {
        runWorker(this);
    }
}

The Worker class implements the Runnable interface. In the constructor of the Worker class, this.thread = getThreadFactory().newThread(this) is crucial. This line of code means that the current Worker object is created using A thread actually means that the thread object and the run method called in the current Worker object are the same. At this point we can conclude that t.start in the addWorker method in the previous step actually calls the run method in the Worker class.

So how does runWorker run?

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //Get the task to be executed
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    
    //Polling call getTask to get the task
    while (task != null || (task = getTask()) != null) {
        w.lock();
        //Omit some code
            ...
        //Execute run method
        task.run();
        //Omit some code
            ...
    }
}

A while loop is used in runWorker to continuously call getTask to obtain new tasks.

Finally, let’s look at what the getTask method does:

private Runnable getTask() {
    boolean timedOut = false;
    //Infinite loop
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if the queue is empty
        if (rs >= SHUTDOWN & amp; & amp; (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        
        //Get the number of running threads and decide whether to allow scheduled waiting based on allowCoreThreadTimeOut
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //When the thread times out and the queue is empty, the actual number of running threads is reduced by 1 through CAS
        if ((wc > maximumPoolSize || (timed & amp; & amp; timedOut))
                 & amp; & amp; (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //If timeout is allowed, call the poll method of the queue to wait regularly.
            // Otherwise call take to get the task
            Runnable r = timed?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):
                    workQueue.take();
            //Get the task and return the result
            if (r != null)
                return r;
            //Continue looping and set the timeout flag to true
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

It can be seen from the above source code:

  1. In the for infinite loop, executable tasks are obtained by constantly checking the thread pool status and queue capacity;
  2. In the Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); code, there are two situations
    • timed is true, allowing elimination of Worker, that is, the actual running thread, then pass
      The method of workQueue.poll is used to wait for pull tasks regularly. If the task is obtained within the specified keepAliveTime, it will return. If there is no task, the for loop will continue until timed is equal to false;
    • If timed is false, the workQueue.take method will be called. The meaning of the take method in the queue is that when there is a task in the queue, it will immediately return to the first task of the queue. When there is no task, the current thread will be blocked until there is a new task.< /strong>.

Below is a simple sequence diagram of the core thread:
Core thread sequence diagram

Conclusion

When the shutdown method is not called, the thread pool blocks the run method of the core thread (Worker) through the queue’s take method to ensure that the core thread is not destroyed.