Concurrent programming-analysis of the underlying principles of thread pool ThreadPoolExecutor (1)

Question:

How to set the number of core threads and the maximum number of threads in the thread pool?

What is the specific process of thread pool execution tasks?

How do the five states of the thread pool flow?

How are threads in the thread pool closed?

Why does the thread pool have to be a blocking queue?

If an exception occurs to a thread, will it be removed from the thread pool?

How does Tomcat customize the thread pool?

What is the specific process of thread pool execution tasks?

ThreadPoolExecutor provides two methods for executing tasks:

1.void execute(Runnable command)

2.Future submit(Runnable task)

In fact, the execute() method is ultimately called in submit, but it returns a Future object to obtain the task execution result:

public Future<?> submit(Runnable task) {
 if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); 3
 execute(ftask);
 return ftask;
 }

The execution of the execute(Runnable command) method will be divided into three steps:

Note: When submitting a Runnable, regardless of whether the threads in the current thread pool are idle or not, as long as the number is less than the number of core threads, it will be created
New thread.

Note: ThreadPoolExecutor is equivalent to unfairness. For example, the Runnable submitted after the queue is full may be lower than the current one
The queued Runnable is executed first.

How do the five states of the thread pool flow?

The thread pool has five states:

RUNNING:
Will
receive new tasks and
Will
Process tasks in the queue

SHUTDOWN:
No
receive new tasks and
Will
Process tasks in the queue

STOP:
No
receive new tasks and
No
Process tasks in the queue and interrupt processing tasks
(Note: Whether a task can be interrupted depends on the task itself)

TIDYING:
All tasks have been terminated and there are no threads in the thread pool.
, so that the status of the thread pool will change to TIDYING. Once this status is reached, terminated() of the thread pool will be called.

TERMINATED:
After terminated() is executed, it will change to TERMINATED.

These five states cannot be converted arbitrarily, and there will only be the following conversion situations:

1.
RUNNING -> SHUTDOWN: Triggered by manually calling shutdown(), or finalize() will be called during thread pool object GC and shutdown() will be called.

2.
(RUNNING or SHUTDOWN) -> STOP: Triggered by calling shutdownNow(). If shutdown() is called first, call it immediately.

shutdownNow(), SHUTDOWN -> STOP will occur

3.
SHUTDOWN -> TIDYING:
Queue is empty
and
When there are no threads in the thread pool
automatic conversion

4.
STOP -> TIDYING:
When there are no threads in the thread pool
Automatic conversion (there may still be tasks in the queue)

5.
TIDYING -> TERMINATED: It will be automatically converted after terminated() is executed.

How are threads in the thread pool closed?

We usually use the thread.start() method to start a thread, but how to stop a thread?

The Thread class provides a stop(), but it is marked @Deprecated. Why is it not recommended to use the stop() method to stop the thread?

Because the stop() method is too crude, once stop() is called,
The thread will be stopped directly
,
But when you call it, you don’t know what the thread was just doing or what step the task has reached. This is very dangerous
.

One thing to emphasize here is,
stop() will release the synchronized lock occupied by the thread
(The ReentrantLock lock will not be automatically released, which is also a factor why stop() is not recommended).

public class ThreadTest {

    static int count = 0;
    static final Object lock = new Object();
    static final ReentrantLock reentrantLock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(new Runnable() {
            public void run() {
                // synchronized (lock) {
                reentrantLock.lock();
                for (int i = 0; i < 100; i + + ) {
                    count + + ;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                // }
                reentrantLock.unlock();
            }
        });

        thread.start();

        Thread.sleep(5 * 1000);

        thread.stop();
        //
        // Thread.sleep(5*1000);

        reentrantLock.lock();
        System.out.println(count);
        reentrantLock.unlock();

        // synchronized (lock) {
        // System.out.println(count);
        // }


    }
}




So, we recommend stopping a thread by customizing a variable or interrupting, such as:

public class ThreadTest {

    static int count = 0;
    static boolean stop = false;

    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(new Runnable() {
            public void run() {

                for (int i = 0; i < 100; i + + ) {
                    if (stop) {
                        break;
                    }

                    count + + ;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });

        thread.start();

        Thread.sleep(5 * 1000);

        stop = true;

        Thread.sleep(5 * 1000);

        System.out.println(count);

        
    }
}

The difference is that when we set stop to true, the thread itself can control whether and when to stop. Similarly, we can call interrupt() of thread to interrupt the thread:

public class ThreadTest {

    static int count = 0;
    static boolean stop = false;

    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(new Runnable() {
            public void run() {

                for (int i = 0; i < 100; i + + ) {
                    if (Thread.currentThread().isInterrupted()) {
                        break;
                    }

                    count + + ;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
        });

        thread.start();

        Thread.sleep(5 * 1000);

        thread.interrupt();

        Thread.sleep(5 * 1000);


        System.out.println(count);

    }
}

The difference is that if the thread is interrupted during sleep, an exception will be received.

Having said so much, in fact, interrupt() is used to stop threads in the thread pool. For example, the shutdownNow() method will call:

 void interruptIfStarted() {
         Thread t;
         if (getState() >= 0 & amp; & amp; (t = thread) != null & amp; & amp; !t.isInterrupted()) {
             try {
                 t.interrupt();
                 } catch (SecurityException ignore) {
                 }
             }
         }

Why does the thread pool have to be a blocking queue?

During the running process, the threads in the thread pool will continue to obtain tasks from the queue and execute them after executing the first task bound when creating the thread. Then if there are no tasks in the queue, the thread will not die naturally. It will be blocked when acquiring the queue task, and when there is a task in the queue, it will get the task and execute the task. This method can ultimately ensure that a specified number of core threads can be reserved in the thread pool. The key code is:

 try {
            Runnable r = timed?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):
            workQueue.take();
            if (r != null)
            return r;
            timedOut = true;

            } catch (InterruptedException retry) {
            timedOut = false;
            }

When a thread obtains a task from the queue, it will determine whether to use timeout blocking to obtain it. We can think that the non-core thread will poll(), the core thread will take(), and the non-core thread will naturally fail to obtain the task after the time expires. Died.

If a thread encounters an exception, will it be removed from the thread pool?

Answer:
Yes,
Is it possible that the number of core threads is wrong when executing tasks, causing all core threads to be removed from the thread pool?

In the source code, when an exception occurs when executing a task, processWorkerExit() will eventually be executed. After executing this method, the current thread will die naturally, but! An additional thread will be added in the processWorkerExit() method, so that the fixed number of core threads can be maintained.

How does Tomcat customize the thread pool?

The thread pool used in Tomcat is org.apache.tomcat.util.threads.ThreadPoolExecutor. Pay attention to the class name and JUC.

The same, but the package name is different.

Tomcat will create this thread pool:

 public void createExecutor() {
       
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon,
                getThreadPriority());

        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,
                TimeUnit.SECONDS, taskqueue, tf);

        taskqueue.setParent((ThreadPoolExecutor) executor);
    }

//Inject the incoming queue as TaskQueue, and its enqueue logic is:

 public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent == null) {
            return super.offer(o);
        }

        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
            return super.offer(o);
        }

        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount() <= (parent.getPoolSize())) {
            return super.offer(o);
        }

        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
            return false;
        }

        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

Special in:

When joining the queue, only if the number of threads in the thread pool is equal to the maximum number of thread pools will it join the queue.

When joining the queue, if the number of threads in the thread pool is less than the maximum number of thread pools, false will be returned, indicating that joining the queue failed.

This controls the thread pool of Tomcat when submitting a task:

1.
It will still first determine whether the number of threads is less than the number of core threads. If it is less than the number of core threads, create a thread.

2.
If it is equal to the number of core threads, it will be added to the queue. However, if the number of threads is less than the maximum number of threads, joining the queue will fail and threads will be created.

Therefore, as the task is submitted, threads will be created first and will not join the queue until the number of threads equals the maximum number of threads.

Of course, there is a relatively detailed logic: when submitting a task, if the number of tasks being processed is less than the number of threads in the thread pool, it will be directly added to the queue without creating a thread, which is the getSubmittedCount in the source code above. role.

How to set the number of core threads and the maximum number of threads in the thread pool?

We all know that there are two very important parameters in the thread pool:

1. corePoolSize:
The number of core threads indicates the number of resident threads in the thread pool.

2. maximumPoolSize:
Maximum number of threads, indicating the maximum number of threads that can be opened in the thread pool

How to set these two parameters?

The tasks we are responsible for executing by the thread pool are divided into three situations:

1. CPU-intensive tasks, such as finding prime numbers from 1-1000000

2. IO-intensive tasks, such as file IO and network IO

3. Mixed tasks

Due to the characteristics of CPU-intensive tasks, the thread will always utilize the CPU when executing the task, so in this case, try to avoid thread context switching as much as possible.

For example, my computer now only has one CPU. If two threads are executing the task of finding prime numbers at the same time, then the CPU will need to perform additional thread context switching to achieve the effect of thread parallelism. At this time, the two threads are executing The total time is:

Task execution time*2 + thread context switching time

And if there is only one thread and this thread performs two tasks, then the time is:

Task execution time*2

Therefore, for CPU-intensive tasks, the number of threads is best equal to the number of CPU cores. You can get the number of cores of your computer through the following API:

 Runtime.getRuntime().availableProcessors()

However, in order to respond to thread blocking requests caused by page faults or other exceptions during thread execution, we can set up an additional thread, so that when a thread temporarily does not need the CPU, there can be a substitute thread to continue to utilize the CPU.

So, for CPU-intensive tasks, we can set the number of threads to:
Number of CPU cores + 1

Let’s look at IO-type tasks. When threads perform IO-type tasks, they may be blocked on IO most of the time. If there are 10

CPU, if we only set up 10 threads to perform IO-type tasks, then it is very likely that these 10 threads are blocked on IO, so these 10 CPUs will have no work to do. Therefore, for IO-type tasks, We usually set the number of threads to:
2*Number of CPU cores

However, even if it is set to
2*Number of CPU cores
, is not necessarily the best. For example, if there are 10 CPUs and the number of threads is 20, it is possible that these 20 threads are blocked on IO at the same time, so you can add more threads to squeeze the CPU utilization.

Generally, if the IO-type task takes longer to execute, the more threads may be blocked on IO at the same time, and we can set more
Threads, but the more threads, the better
, we can calculate it through the following formula:

Number of threads = Number of CPU cores * (1 + thread waiting time / total thread running time)

Thread waiting time:
Refers to the time when the thread is not using the CPU, such as blocking in IO

Total thread running time:
Refers to the total time it takes for a thread to complete a task

We can estimate these two times using jvisualvm sampling:

The figure shows that during the sampling process just now, the total execution time of run() was 538948ms, and the CPU time used was

86873ms, so the time not utilizing the CPU is 538948ms-86873ms.

So we can calculate:

Thread waiting time = 538948ms-86873ms

Total thread running time = 538948ms

so:
Number of threads = 8 * (1 + (538948ms-86873ms) / 538948ms) = 14.xxx, so the number of threads calculated according to the formula is about 14 or 15 threads.

According to the above formula, if the task we perform is an IO-intensive task, then: thread waiting time = total thread running time, so:

Number of threads = Number of CPU cores * (1 + thread waiting time / total thread running time)

= Number of CPU cores * (1 + 1)

= Number of CPU cores * 2

The above is just a theory. In actual work, the situation will be more complicated. For example, in an application, there may be multiple thread pools. In addition to the threads in the thread pool, there may be many other threads, or in addition to this application, some other applications are also running, so If you want to determine the number of threads in actual work, it is best to perform a stress test.

For example, write:

@RestController
 public class ZhouyuController {

 @GetMapping("/test")
 public String test() throws InterruptedException {
 Thread.sleep(1000);
 return "zhouyu";
 }

 }

This interface will execute for 1s. I now use apipost to press it:

This is the stress test result under Tomcat’s default maximum request of 200 threads.

When we adjust the number of threads to 500:

server.tomcat.threads.max=500

It is found that the execution efficiency has doubled. If the number of threads is increased to 1000:

Performance is reduced.

Summary, we are working on:

1. CPU-intensive tasks:
Number of CPU cores + 1, which can fully utilize the CPU without causing too much context switching cost

2. IO tasks
: It is recommended to perform a pressure test, or first use a formula to calculate a theoretical value (theoretical values are usually relatively small)

3. For core business (high access frequency):
The number of core threads can be set to the result of our stress test. The maximum number of threads can be equal to the number of core threads, or a little larger. For example, during our stress test, we may find that 500 threads are optimal, but 600 threads are also okay. , at this time 600 can be the maximum number of threads

4. For non-core business (access frequency is not high)
, the number of core threads can be relatively small to avoid the operating system maintaining unnecessary threads. The maximum number of threads can be set to the results of our calculation or stress test.