ThreadPoolExecutor source code analysis

Foreword

The main purpose of writing this article is to analyze the design idea and key source code implementation of the thread pool, on the one hand to deepen your understanding, and on the other hand to help memory. Because there are too many articles about the use of thread pools on the Internet, some basic concepts will not be repeated. The source code of ThreadPoolExecutor is generally not complicated. The code of Doug Lea is very elegant, with clear ideas and ingenious designs. Without further ado, let’s start the text.

Inheritance structure of ThreadPoolExecutor

1684311199623.png
Executor is at the top, it has only one execution method execute
ExecutorService is an extension of the Executor interface, which provides a wider range of features, such as submit to submit tasks to the thread pool and return Future objects. One more thing here, the RunnableFuture interface inherits this interface and provides an important implementation class FutureTask. When submitting tasks through submit, each task we define is packaged into a FutureTask and submitted to the thread pool to understand FutureTask semantically. is a task that can capture the execution result in the future;
The AbstractExecutorService abstract class provides many common implementations for subclasses to use, including the process of packaging FutureTask mentioned above, and for example, invokeAny to execute any task, invokeAll to execute all tasks, etc.;
The ThreadPoolExecutor class provides an extensible thread pool implementation, including the opening and closing of the thread pool, thread pool status monitoring, and of course the most important task to the thread pool, that is, the execute of the top-level interface Executor The method implementation is also in this class
Here you should understand the difference in usage between the execute and submit methods in ThreadPoolExecutor:
If you need to get the result (FutureTask), you can use the submit method. If you don’t need to get the result, you can use the execute method.

Constructor of ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {<!-- -->
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
    null :
    AccessController. getContext();
    this. corePoolSize = corePoolSize;
    this. maximumPoolSize = maximumPoolSize;
    this. workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this. threadFactory = threadFactory;
    this.handler = handler;
}

Explain the parameters in the constructor:

  • corePoolSize number of core threads
  • maximumPoolSize the maximum number of threads, the maximum number of threads allowed to be created by the thread pool
  • workQueue task queue, an implementation of the BlockingQueue interface
  • keepAliveTime The keep-alive time of idle threads. The idle threads here refer to threads that exceed the number of core threads. When the extra threads do not receive new tasks within a certain period of time, the thread resources will be recycled.
  • threadFactory thread factory
  • handler rejection strategy, when the thread pool is full and a new task is submitted, the rejection strategy will be executed, such as throwing an exception, or handing it over to the thread that submitted the task for execution, or discarding the oldest task in the waiting queue, etc. (The situation when the thread pool is full will be mentioned later)

In addition to the constructed member variables, let’s take a look at several internal variables used in this class

//The design here uses Integer.SIZE, that is, the lower 29 bits are used to store the number of threads in the thread pool, and the higher 3 bits are used to save the state of the thread pool
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29
private static final int COUNT_BITS = Integer. SIZE - 3;
// 000 1111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// The state of the thread pool exists in the high position
// normal status
// 111 000000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;

// After calling the shutdown() method, the thread pool is in this state and does not accept new tasks, but will continue to process tasks in the queue
// 000 000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;

// After calling the shutdownNow() method, the thread pool is in this state, does not accept new tasks, does not process tasks in the queue, and interrupts the executing thread
// 001 000000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;

// All tasks are destroyed, workCount is 0. When the state of the thread pool is converted to the TIDYING state, the hook method terminated() will be executed
// 010 000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;

// After the terminated() method ends, the state of the thread pool will become this
// 011 000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;

// Change the lower 29 bits to 0 to get the status of the thread pool
private static int runStateOf(int c) {<!-- --> return c & amp; ~CAPACITY;
// High 3 is modified to 0, and the number of threads in the thread pool is obtained
private static int workerCountOf(int c) {<!-- --> return c & amp; CAPACITY; }

private static int ctlOf(int rs, int wc) {<!-- --> return rs | wc; }

execute method

I have a basic impression of the state of the thread pool. There are 5 states of the thread pool, and the state of the thread pool is normal only when the thread pool is less than 0. Let’s look at the execute method of ThreadPoolExecutor. We must know that the submit method is finally executed after wrapping the task.

public void execute(Runnable command) {<!-- -->
    //The command here can be understood as the meaning of the task
        if (command == null)
            throw new NullPointerException();

        int c = ctl. get();
        if (workerCountOf(c) < corePoolSize) {<!-- -->
            //If the number of worker threads is less than the number of core threads, create a new thread to process the current task
            if (addWorker(command, true))
                //The thread is created successfully, and the task is handed over to it, so you can return directly
                return;
            //To directly explain that the thread creation is unsuccessful, you need to update the thread status
            c = ctl. get();
        }
        if (isRunning(c) & amp; & amp; workQueue. offer(command)) {<!-- -->
            //The thread pool is running and the task is successfully submitted to the queue
            int recheck = ctl. get();
            //When the task is successfully enqueued, we still need to verify whether to create a new thread, because there is a thread death or thread pool shutdown
            if (!isRunning(recheck) & amp; & amp; remove(command))
                //The thread pool is no longer in the RUNNING state, remove the task and execute the rejection strategy
                reject(command);
            else if (workerCountOf(recheck) == 0)
                //This shows that the thread pool is still normal, but the worker thread is 0, and a new thread needs to be created to process the tasks in the queue
                addWorker(null, false);
        }
        //If the task in the queue is full, it will come here, create a new thread with maximumPoolSize as the boundary,
        //If the queue is full and the worker thread exceeds the maximumPoolSize, then execute the rejection strategy
        else if (!addWorker(command, false))
            reject(command);
    }

The semantics of addWorker in the above code is to create a Worker to process tasks. The Worker class needs to be explained here. A Worker is like a worker working on an assembly line. It is an abstraction of the worker thread in the thread pool. When a Worker is constructed, it can specify firstTask. That is, specify the first task to be processed after the thread is created, or you can not specify it, and go to the queue to get the task. Another member variable thread is the thread that actually executes the task. In fact, it is not accurate to say that Worker is a thread, but for the convenience of understanding, I will use thread instead of Worker later. The following is the code of Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {<!-- -->
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completed Tasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {<!-- -->
            setState(-1); // inhibit interrupts until runWorker
            this. firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker */
        public void run() {<!-- -->
            runWorker(this);
        }

        //The following are some implementations of AQS
        ...

}

addWorker method

The role of Worker is explained, let’s go back to the front and continue to look at the addWorker method

private boolean addWorker(Runnable firstTask, boolean core) {<!-- -->
    retry:
    //The double for loop here can be understood in this simple way: the outer loop is used to check the status of the thread pool, and the inner loop is used to check the number of working threads. If all the layers of checks pass, then congratulations, you can create a new thread up
    for (;;) {<!-- -->
        int c = ctl. get();
        int rs = runStateOf(c);
        // The thread pool is not RUNNING as a premise. If one of the following three situations occurs, the new thread will be directly rejected
        // 1. Not in SHUTDOWN state, which means that the thread pool needs to be closed immediately, and of course new threads will not be allowed to be created
        // 2. There are still new tasks to be submitted, so they are directly rejected
        // 3. There are no pending tasks in the queue
        // Then the thread pool is not based on RUNNING. Is there any need to create a new thread?
        // The situation I can think of is that when SHUTDOWN, new tasks are not allowed to be submitted, but the tasks that have entered the queue must be executed, so on the basis of satisfying the conditions, it is allowed to create new threads
        if (rs >= SHUTDOWN & amp; & amp;
            ! (rs == SHUTDOWN & amp; & amp;
               firstTask == null & &
               !workQueue. isEmpty()))
            return false;

        for (;;) {<!-- -->
            int wc = workerCountOf(c);
            //Judging the number of worker threads, if it is greater than the maximum capacity of the thread pool (this is generally impossible), or greater than the number of core threads (maybe the maximum number of threads), then refuse to create a new thread
            //If the core here is false, it will be bounded according to the defined maximum number of threads
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //Go here to explain all the conditions for creating new threads that are met. If the CAS succeeds, it will jump out of all loops and start officially creating threads
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //This shows that the CAS has failed, and there is a concurrency situation. Get the ctl again, and then use the inner loop to judge the number of threads
            c = ctl.get(); // Re-read ctl
            //If the state of the thread pool has changed, then another outer loop is required
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //Here is the official start of thread creation
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {<!-- -->
        w = new Worker(firstTask);
        final Thread t = w. thread;
        if (t != null) {<!-- -->
            final ReentrantLock mainLock = this. mainLock;
            mainLock. lock();
            try {<!-- -->
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl. get());

                //The status of the thread pool is checked again here, and the conditions for adding threads to workers
                //1. The thread pool is in a normal state
                //2. The thread pool is in the SHUTDOWN state, and a new thread needs to be created to process the tasks in the queue (firstTask == null)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN & amp; & amp; firstTask == null)) {<!-- -->
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers. add(w);
                    int s = workers. size();
                    //largestPoolSize records the maximum number of worker threads in the thread pool, and workers are the collection of current worker threads
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {<!-- -->
                mainLock. unlock();
            }
            //If the workers are added successfully, start the thread here
            if (workerAdded) {<!-- -->
                t. start();
                workerStarted = true;
            }
        }
    } finally {<!-- -->
        //The thread did not start successfully
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

A new Worker is created through the code of addWorker above, and its internal thread object is started. This thread is passed in this when the Worker is initialized and constructed, that is, the worker object itself, so after the thread starts, it will call the Worker’s run method, and finally enter runWorker method.

Worker(Runnable firstTask) {<!-- -->
    setState(-1); // inhibit interrupts until runWorker
    this. firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {<!-- -->
    runWorker(this);
}

runWorker method

Before looking at the runWorker method, there must be a concept. After the method is executed, it means that the worker thread is closed.

final void runWorker(Worker w) {<!-- -->
Thread wt = Thread. currentThread();
Runnable task = w. firstTask;
w. firstTask = null;
w. unlock(); // allow interrupts
boolean completed Abruptly = true;
try {<!-- -->
    //When creating a Worker, if a task is specified, it will be executed directly, if not, it will be obtained from the queue
    //Note that this is a while loop, which means that without being interrupted, our "workers" will keep getting and executing tasks until there are no more tasks in the queue
    while (task != null || (task = getTask()) != null) {<!-- -->
        // The runWorker method is executed by multiple threads, so operations on the same task need to be locked
        w. lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted. This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        //The thread pool status is greater than or equal to STOP, that is, it needs to be stopped immediately, then the thread should also be interrupted
        if ((runStateAtLeast(ctl. get(), STOP) ||
             (Thread. interrupted() & amp; & amp;
              runStateAtLeast(ctl. get(), STOP))) & amp; & amp;
            !wt.isInterrupted())
            wt. interrupt();
        try {<!-- -->
            //Hook method, implemented by subclasses
            beforeExecute(wt, task);
            Throwable thrown = null;
            try {<!-- -->
                //Get the task to start executing, think about why run is called directly here instead of new Thread().start
                task. run();
            } catch (RuntimeException x) {<!-- -->
                throw = x; throw x;
            } catch (Error x) {<!-- -->
                throw = x; throw x;
            } catch (Throwable x) {<!-- -->
                throw = x; throw new Error(x);
            } finally {<!-- -->
                //A lot of catch above is to pass exception information to this hook method
                //We can customize the exception handling of the thread pool by implementing this method
                afterExecute(task, thrown);
            }
        } finally {<!-- -->
            task = null;
            w. completedTasks++;
            w. unlock();
        }
    }
    completed Abruptly = false;
} finally {<!-- -->
    processWorkerExit(w, completedAbruptly);
}
}

OK, let’s continue to see how the getTask method gets tasks from the queue

private Runnable getTask() {<!-- -->
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {<!-- -->
        int c = ctl. get();
        int rs = runStateOf(c);

        // Explain here that the task cannot be obtained from the queue
        // 1. The state of the thread pool is greater than SHUTDOWN, that is, it needs to be closed immediately, and the task in the queue is refused to be processed, and it returns empty directly
        // 2. The state of the thread pool SHUTDOWN, and the tasks in the queue have been processed
        if (rs >= SHUTDOWN & amp; & amp; (rs >= STOP || workQueue.isEmpty())) {<!-- -->
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // The semantics of timed can be understood in this way: do you need to close idle timed-out threads
        // allowCoreThreadTimeOut - Allow thread recovery within the number of core threads, the default is false
        // The number of worker threads is greater than the number of core threads and needs to be recycled
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    //If the current worker thread exceeds the maximum number of threads, or the acquisition task times out, return null
        //What does it mean to return null? You can see it in conjunction with the code of runWorker. The while loop ends and the thread is executed, so it means to close the thread
        //Think about a question: the number of threads was not checked before, why is there a situation where the number of working threads>maximumPoolSize?
        //Because developers can adjust the thread pool to a smaller size through setMaximumPoolSize
        if ((wc > maximumPoolSize || (timed & amp; & amp; timedOut))
             & amp; & amp; (wc > 1 || workQueue.isEmpty())) {<!-- -->
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {<!-- -->
            // Get tasks from the queue
            Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue. take();
            if (r != null)
                return r;
            //Here it is timed out
            timedOut = true;
        } catch (InterruptedException retry) {<!-- -->setMaximumPoolSize for threads
            //Here, if setMaximumPoolSize reduces the thread pool, the excess threads need to be closed,
            //setMaximumPoolSize sets an interrupt for the thread, the thread will execute here, re-enter the for loop after the end, and return when judging in line 26, so that the excess thread is closed
            timedOut = false;
        }
    }
}

Reject policy

At this point, the execute method is basically analyzed. The concept of thread pool rejection strategy was mentioned earlier, that is, the thread pool refuses to accept new tasks. Let me explain in detail here

final void reject(Runnable command) {<!-- -->
handler. rejectedExecution(command, this);
}

For the handler here, we need to pass in this parameter when constructing the thread pool, which is an instance of RejectedExecutionHandler.
RejectedExecutionHandler has four defined implementation classes in ThreadPoolExecutor that we can use directly. Of course, we can also implement our own strategies, but generally it is not necessary. The following are the four built-in deny policy classes.

  1. CallerRunsPolicy – The thread that submitted the task executes the task itself
  2. AbortPolicy – the default policy, throwing an exception directly
  3. DiscardPolicy – ignore this task without doing anything
  4. DiscardOldestPolicy – discard the oldest task in the queue

The source code is relatively simple, interested students can go to see it by themselves

Summary

  1. When we call the submit method of the thread pool, the submitted task will be packaged into a FutureTask object and returned. It implements the Runnable and Future interfaces. Semantically, it can be considered as an asynchronous task that can retrieve the result. We can call the get method to block and Get the execution result. It is worth noting that the submit method is ultimately called the ThreadPoolExecutor.execute method.
  2. The execute method mainly completes two things: enqueue the task and start the worker thread
  • If the current number of threads is less than corePoolSize, a new thread is created when submitting the task, and this thread executes the task;
  • If the current number of threads has reached corePoolSize, then add the submitted tasks to the queue, and wait for the threads in the thread pool to fetch tasks from the queue;
  • If the queue is full, create a new thread to execute the task. It is necessary to ensure that the number of threads in the pool will not exceed the maximumPoolSize. If the number of threads exceeds the maximumPoolSize at this time, then execute the rejection strategy.
  1. The runWorker method, the newly created worker thread in the thread pool will execute this method. In this method, the worker thread continuously obtains tasks from the queue and executes them. Our custom exception handling is also called here, and it also includes when the thread is closed. some processing.

Reference text
https://www.javadoop.com/post/java-thread-pool#toc_5