Handwritten ThreadPoolExecutor thread pool

Many people don’t recommend making wheels, but I don’t. I built the wheel not to use it in a production environment, but to deepen my understanding. Why not? I personally think that making wheels is almost the best way to learn, if not even one. Because making a wheel requires doing at least the following two things:

  • Understand design thinking (design level)
  • A brief look at the source code (code level)

If you don’t understand design, you can’t grasp the whole. If you haven’t seen the code, you can’t perfect the details.

In addition, from the creator’s perspective, it is sometimes too difficult to directly analyze the source code, there are too many codes, and the level of abstraction is too deep. If we can reinvent the wheel, reduce the level of abstraction, and present it in a straightforward manner, it will be easier for readers to understand.

Since it’s so good to make wheels, let’s get it. Today I will show you how to create a thread pool.

Thread pool design ideas

Pooling technology

Everyone has heard of the so-called “pooling technology”, such as database connection pool, constant pool, thread pool, object pool, etc. Pooling technology is a commonly used and effective optimization method in the computer world. So I want to ask you, what does the “pool” in the thread pool refer to?

Putting aside the insignificant little shrimps, there are two most important things in the thread pool: the tasks we submit to the Executor and the Thread maintained by the Executor itself. Among them,thread “pool” obviously refers to a collection of Threads.

But unlike general pooling technologies such as database connection pools, ThreadPool’s role is not only “pooling”, but its more important responsibility is actually to “do work”, that is, to execute tasks. For example, when we usually use a database connection pool, we actually take out a Connection from the pool. After executing the SQL, the rewritten close() will be called to return the Connection. But have you ever seen someone asking for Thread from ThreadPool? Will it give it to you? What ThreadPool does is:

Want to get Thread from the pool? No way! Don’t you know that you have a lot of knowledge about multi-threading? Be careful to play with fire and burn yourself. If you want to perform a task, you can throw it in yourself and I will protect you.

In other words, ThreadPool never thought of letting you take away Thread from the beginning! But what if you want to return the results? I return a FutureTask. If you need the result, use FutureTask#get() yourself. But the initiative still lies with ThreadPool. It has the final say whether it can get the result and whether it wants to block!

Most people find thread pools difficult, not because they don’t understand the thread “pool”, but because they don’t understand how it “does work”. In other words: How does the thread pool perform tasks? This involves the biggest difference between thread pools and general pooling technology: internalized execution operations, and tasks are executed through the production and consumption model.

Production and consumption model

If you continuously submit tasks to the thread pool, you will generally go through 4 stages:

  • Core thread processing tasks
  • The task enters the task queue and waits
  • Non-core thread processing tasks
  • Deny policy

Especially in the second stage, tasks that are too late to be processed will be temporarily stored in the workQueue (task queue), so a typical production and consumption model emerges.

The caller delivers the Task ====> ThreadPool.workQueue ====> workerThread blocks to obtain Task execution

Several important concepts

How does the thread pool reuse threads?

Sometimes, to solve a problem, it may be easier to start from the opposite direction. Let’s not worry about how to reuse threads for now, let me ask you: How to recycle/destroy threads? (Knowing under what circumstances the thread will be destroyed, then as long as you avoid destruction, you can reuse it)

The word “thread” actually has two levels of reference: Threadobject and JVMthread resource (essentially still operating system threads). There is a binding relationship between the Thread object and the thread resource. After a thread resource is allocated, Thread#run() will be found as the code execution entry.

When will the thread be destroyed? Normally, after new Thread(tartget).start(), the operating system will allocate thread resources. When the thread finishes executing the code in Thread#run(), it will die naturally. As for the Thread object, if there is no reference, it will also be recycled by GC.

Seeing this, I think everyone should understand: As long as the task never ends, the thread will never die. How can the task never end? Either do the task in a loop or block.

The essence of the thread pool is also Thread, but the difference is between a single entity and a collection. Since Thread’s characteristic of “destroying after completing the task” is innate and destined, the thread pool cannot change this. Therefore, if the thread pool wants to keep the internal threads alive, it must keeps threads busy working, that is, keep them working. What should I do if I really have no work to do? Then block it (you can use blocking queue)! In short, you cannot be allowed to “complete execution”, otherwise it will be destroyed.

How to ensure that only “non-core threads” are destroyed

Everyone has heard some formulas in eight-part essays, such as “During idle time, if a non-core thread is idle for more than keepAliveTime, it will be recycled.” How is this achieved?

First of all, there is a common misunderstanding that many people think that the thread pool will mark each Thread when creating a thread, such as marking core threads as coreThread, non-core threads as nonCoreThread, and then recycling nonCoreThread during idle time.

However, Doug Lea of JDK doesn’t think so. The solution he adopted is simpler and more crude:

  • If the current number of threads <= corePoolSize, then all threads are core threads and will not be recycled.
  • The current number of threads > corePoolSize, then recycle part of the threads appropriately

See, “some” threads don’t even care who you are.

OK, Luo Li has talked a lot, let’s start coding.

Copycat Thread Pool Demo

It is recommended that you copy the code to local debugging. The code is already very streamlined and easier to track.

public class ThreadPool {

    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Worker thread
     */
    private final List<Worker> workers = new ArrayList<>();
    /**
     * Task queue
     */
    private BlockingQueue<Runnable> workQueue;
    /**
     *Number of core threads
     */
    private final int corePoolSize;
    /**
     *Maximum number of threads
     */
    private final int maximumPoolSize;
    /**
     * Maximum idle time of non-core threads (otherwise the thread will be destroyed)
     */
    private final long keepAliveTime;

    public ThreadPool(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit timeUnit,
                      BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
    }

    public void execute(Runnable task) {
        Assert.notNull(task, "task is null");

        //Create core thread processing tasks
        if (workers.size() < corePoolSize) {
            this.addWorker(task, true);
            return;
        }

        //Try to join the task queue
        boolean enqueued = workQueue.offer(task);
        if (enqueued) {
            return;
        }

        //Create non-core thread processing tasks
        if (!this.addWorker(task, false)) {
            //The number of non-core threads reaches the upper limit, triggering the rejection policy
            throw new RuntimeException("deny policy");
        }
    }

    private boolean addWorker(Runnable task, boolean core) {
        int wc = workers.size();
        if (wc >= (core ? corePoolSize : maximumPoolSize)) {
            return false;
        }

        boolean workerStarted = false;
        try {
            Worker worker = new Worker(task);
            final Thread thread = worker.getThread();
            if (thread != null) {
                mainLock.lock();
                workers.add(worker);
                thread.start();
                workerStarted = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mainLock.unlock();
        }

        return workerStarted;
    }

    private void runWorker(Worker worker) {
        Runnable task = worker.getTask();

        try {
            // Loop processing tasks
            while (task != null || (task = getTask()) != null) {
                task.run();
                task = null;
            }
        } finally {
            // Exiting from the loop means that the current thread is a non-core thread and needs to be destroyed
            // Java threads can refer to either Thread objects or JVM threads. A Thread object is bound to a JVM thread.
            // Therefore, thread destruction is divided into two dimensions: 1. Remove the Thread object from workers 2. The JVM thread will be destroyed naturally after completing the current task.
            workers.remove(worker); // TODO is best locked
        }
    }


    private Runnable getTask() {
        boolean timedOut = false;

        // Loop to get tasks
        for (; ; ) {

            //Whether it is necessary to detect timeout: the current number of threads exceeds core threads
            boolean timed = workers.size() > corePoolSize;

            // Need to detect timeout & amp; & amp; has timed out
            if (timed & amp; & amp; timedOut) {
                return null;
            }

            try {
                //Whether timeout needs to be detected
                // 1. Required: poll blocks acquisition, waits for keepAliveTime, and returns after waiting, regardless of whether the task is obtained or not
                // 2. No need: take continues to block until the result is obtained
                Runnable r = timed?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    @Getter
    @Setter
    private class Worker implements Runnable {
        private Thread thread;
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
            thread = new Thread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }

}

Code diagram (asynchronous execution by Thread in the dotted box):

This picture is flawed. In fact, the thread pool does not distinguish between coreThread and nonCoreThread. It only depends on whether the current number of threads is greater than corePoolSize.

Test case

@Slf4j
public class ThreadPoolTest {

    public static void main(String[] args) {

        //Create a thread pool, core thread 1, maximum thread 2
        //Submit 4 tasks: the first task is given to the core thread, the second task is enqueued, the third task is given to the non-core thread, and the fourth task is rejected
        ThreadPool threadPoolExecutor = new ThreadPool(
                1,
                2,
                1,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1)
        );

        threadPoolExecutor.execute(() -> {
            log.info("{}:Execute the first task...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:Execute the second task...", Thread.currentThread().getName());
            sleep(10);

        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:Execute the third task...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:Execute the 4th task...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        log.info("main ends");
    }

    private static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

You can replace the thread pool in the test case with JDK’s ThreadPoolExecutor, and the execution effect will be very similar: