Java thread pool – Executor framework

Article directory

    • 1. Executor interface
    • 2. ExecutorService interface
    • 3. ThreadPoolExecutor class
      • 1. Status
      • 2. Worker
      • 3. Extension
    • 4. ForkJoinPool class
      • 1. Work stealing algorithm
      • 2. Fork/Join design
      • 3. Execution principle
    • 5. ScheduledThreadPool class
      • 1.ScheduledExecutorService
      • 2. Compare Timer
    • 6. Executors class

The Executor framework was introduced after Java 5. After Java 5, starting threads through Executor is better than using Thread’s start method. In addition to being easier to manage, it is more efficient (implemented with a thread pool, saving overhead).

The Executor framework not only includes thread pool management, but also provides thread factories, queues, and rejection strategies. The Executor framework makes concurrent programming easier.

The composition of the Executor framework:

  1. Task (Runnable/Callable): Tasks are defined through the Runnable interface or the Callable interface. Either Runnable interface or Callable interface implementation class can be executed by ThreadPoolExecutor
  2. Task execution (Executor): Executor, the core interface of the task execution mechanism, and the ExecutorService interface inherited from the Executor interface. ThreadPoolExecutor implements the ExecutorService interface
  3. Asynchronous calculation results (Future): Both the Future interface and the FutureTask class, the implementation class of the Future interface, can represent the results of asynchronous calculations. When we submit the implementation class of Runnable interface or Callable interface to ThreadPoolExecutor for execution, a Future object will be returned.

1. Executor interface

Thread pools simplify thread management, and JUC provides a flexible thread pool implementation as part of the Executor framework. In the Java class library, the main abstraction for task execution is not Thread but Executor. Executor only defines one method, execute, which is the top-level interface.

 /**
     * Executes the given command at some time in the future. The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);

The execute method accepts a Runnable parameter. This method is defined to execute the passed in method at some time in the future. The method can be run in a new thread, in the thread pool or in the calling thread. This method cannot receive to the execution result of the thread (of course the Runnable interface itself has no return value).

Although Executor is a simple interface, it provides the foundation for a flexible and powerful asynchronous task execution framework that can support many different types of task execution strategies. It provides a standard method to decouple the task submission process from the execution process, and uses Runnable to represent tasks.

The implementation of Executor also provides support for life cycle, as well as statistics collection, application management mechanisms and performance monitoring mechanisms.

**Executor is based on the producer-consumer model. The operation of submitting a task is equivalent to the producer, and the thread executing the task is equivalent to the consumer. **If you want to implement a producer-consumer design in your program, the easiest way is to use Executor.

2. ExecutorService interface

The Executor framework uses Runnable as its basic task representation. Runnable is a very limited abstraction. Its run method cannot return a value or throw a checked exception after executing the task. Many tasks are actually computations with delays-performing a database query, fetching resources from the network, or computing a complex function. Callable is a better abstraction for these tasks, assuming that the main entry point (call) will return a value and possibly throw an exception.

Therefore, ExecutorService was introduced, which inherits from Executor and adds many functions on its basis. It can generate Future methods for tracking the progress of one or more asynchronous tasks to solve the limitations of Executor.

Future represents the life cycle of a task and provides response methods to determine whether it has been completed or canceled, as well as to obtain the results of the task and cancel the task, etc.

The implicit meaning contained in the Future specification is that the life cycle of the task can only move forward, not backward, just like the life cycle of ExecutorService. When a task is completed, it stays in the completed state forever.

The behavior of the get method depends on the status of the task (not yet started, running, completed). If the task is completed, get will return immediately or throw an Exception. If the task is not completed, get will block until the task is completed. If the task throws an exception, get encapsulates the exception as an ExecutionException and rethrows it. If the task is canceled, get will throw CancellationException. If get throws an ExecutionException, you can get the encapsulated initial exception through getCause.

The following methods are defined:

  1. void shutdown(): Initiates an orderly shutdown in which previously submitted tasks are executed but new tasks are not accepted. If the call is already closed, there is no additional effect. Will not block waiting for previously submitted tasks to complete.

  2. List shutdownNow(): Try to stop all executing tasks, stop processing of waiting tasks, and return the list of tasks waiting to be executed. Will not block waiting for the execution of the task to terminate

    • There are no guarantees other than a best-effort effort to stop processing the task in progress. For example, a typical implementation would cancel via Thread.interrupt, so any task that fails to respond to an interrupt may never terminate
  3. boolean isShutdown(): Returns true if this executor has been shut down (the shutdown method was called)

  4. boolean isTerminated(): Returns true if all tasks have been completed after shutdown. Note that isTerminated will never be true unless shutdown or shutdownNow is called first

  5. boolean awaitTermination(long timeout, TimeUnit unit): Blocks until all tasks complete execution after the shutdown request, or a timeout occurs, or the current thread is interrupted, whichever occurs first

  6. Future submit(Callablet task): Submits a task with a return value for execution, and returns a Future representing the pending results of the task. Future’s get method will return the result of the task when the task is successfully completed.

  7. Future submit(Runnable task, T result): Submits a runnable task for execution and returns a Future representing the task. Future’s get method will return the given result upon successful completion.

  8. Future submit(Runnable task): Submits a runnable task for execution and returns a Future representing the task. Future’s get method will return null on successful completion

  9. Listfuture invokeAll(Collection? extends Callable tasks): Execute the given tasks and return a future list that saves their status and results when all tasks are completed. The isDone method of every Future object returns true. Note that a completed task can terminate normally or throw an exception. The result of this method is undefined if the given collection is modified while performing this operation

  10. T invokeAny(Collection? extends Callable tasks): Execute the given task. If there is a successfully completed task, return the result of the successfully completed task (that is, do not throw an exception ). On normal or exception return, unfinished tasks will be canceled. If the given collection is modified while performing this operation, the result of this method is undefined.

3. ThreadPoolExecutor class

ThreadPoolExecutor implements ExecutorService (actually inherits from AbstractExecutorService), and in order to function in a wide range of contexts, this class provides a number of tunable parameters and extensibility hooks:

  1. corePoolSize: Specifies the number of threads in the thread pool. Its number determines whether the added task will open a new thread for execution or be placed in the workQueue task queue.
  2. maximumPoolSize: Specifies the maximum number of threads in the thread pool. This parameter will determine the maximum number of threads that the thread pool will open based on the type of workQueue task queue you use.
  3. keepAliveTime: When the number of idle threads in the thread pool exceeds corePoolSize, how long will it take for the excess threads (emergency threads) to be destroyed?
  4. unit: unit of keepAliveTime
  5. workQueue: Task queue, a task that has been added to the thread pool but has not yet been executed; it is generally divided into direct submission queue, bounded task queue, unbounded task queue, and priority task queue.
  6. threadFactory: Thread factory, used to create threads, generally use the default
  7. handler: Rejection strategy; how to reject tasks when there are too many tasks to handle

1. Status

  1. RUNNING(-1<<29): Accept new tasks and process queued tasks
  2. SHUTDOWN(0<<29): Do not accept new tasks but process queued tasks
  3. STOP(1<<29): Do not accept new tasks or process queued tasks, and interrupt processing tasks
  4. TYDING(2<<29): All tasks are terminated, this state is entered after the number of worker threads reaches 0, and the terminated() hook function will be executed.
  5. TERMINATED(3<<29): terminated()After the hook function is executed

States increase monotonically over time, but each state need not be reached.

  1. RUNNING->SHUTDOWN: call shutdown()
  2. RUNNING/SHUTDOWN->STOP: call shutdownNow()
  3. STOP->TYDING: When both queue and pool are empty
  4. TIDYING -> TERMINATED: After the terminated() hook function is executed

The status record in ThreadPoolExecutor is stored in a variable of type AtomicInteger. The high three bits are used to record the status of the thread pool, and the low 29 bits are used to record the number of threads.

2. Worker

ThreadPoolExecutor defines a private static class Worker, which inherits from the AbstractQueuedSynchronizer class and implements the Runnable interface. It maintains thread instance (Thread), task instance (Runnable), and thread task counter (long) variables.

This class extends AbstractQueuedSynchronizer appropriately to simplify acquiring and releasing locks around each task execution. This prevents interrupts that are designed to wake up worker threads waiting for tasks rather than interrupt running tasks. We implemented a simple non-reentrant mutex instead of using a ReentrantLock because we did not want worker tasks to be able to reacquire the lock when calling pool control methods such as setCorePoolSize. Additionally, to suppress interrupts before the thread actually starts running the task, we initialize the lock state to a negative value and clear it on startup (in the runWorker).

3. Extension

This class also defines three protected type hook functions:

  1. beforeExecute: Execute before the thread pool task runs
  2. afterExecute: Executed after the thread pool task runs
  3. terminated: Executed after the thread pool exits

These methods are implemented empty in ThreadPoolExecutor.

4. ForkJoinPool class

The Fork/Join framework is a framework provided by Java7 for executing tasks in parallel. It is a framework that divides large tasks into several small tasks, and finally summarizes the results of each small task to obtain the results of the large task .

Fork is to divide a large task into several subtasks for parallel execution. Join is to merge the execution results of these subtasks and finally get the result of this large task. For example, calculating 1 + 2 + … + 10000 can be divided into 10 subtasks. Each subtask sums 1000 numbers respectively, and finally summarizes the results of these 10 subtasks.

1. Work stealing algorithm

ForkJoinPool is the ExecutorService that runs ForkJoinTasks. **The main difference between ForkJoinPool and other types of ExecutorService is the use of work stealing. The work-stealing algorithm refers to a thread stealing tasks from other queues for execution. **So why use work-stealing algorithms? **If we need to do a relatively large task, we can divide the task into several sub-tasks that do not interfere with each other. In order to reduce competition between threads, put these sub-tasks into different queues, and create a separate queue for each The queue creates a separate thread to execute the tasks in the queue, and the thread and the queue correspond one to one. **For example, thread A is responsible for processing tasks in queue A. However, some threads will finish the tasks in their own queue first, while there are still tasks waiting to be processed in the queues corresponding to other threads. Instead of waiting, the thread that has finished its work might as well help other threads to work, so it steals a task from the queue of other threads for execution. At this time, they will access the same queue, so in order to reduce competition between stealing task threads, a double-ended queue is usually used. The stolen task thread always takes the task from the head of the double-ended queue, while the stealing task thread always takes the task from the head of the double-ended queue. Threads always take tasks from the tail of the double-ended queue for execution.

Advantages of the work-stealing algorithm: Make full use of threads for parallel computing and reduce competition between threads

Disadvantages of the work-stealing algorithm: There is still competition in some cases, such as when there is only one task in the double-ended queue. And this algorithm consumes more system resources, such as creating multiple threads and multiple double-ended queues.

2. Fork/Join design

To design a Fork/Join framework, you need to complete two steps:

  1. Split tasks: You need a fork class to divide a large task into subtasks. The subtasks may still be very large, so you need to keep dividing until the divided subtasks are small enough.
  2. Execute tasks and merge results: The divided subtasks are placed in double-ended queues, and then several startup threads obtain task execution from the double-ended queues. The results of subtask execution are all placed in a queue, a thread is started to get data from the queue, and then the data is merged.

Fork/Join uses two classes to accomplish the above two things:

  1. ForkJoinTask (abstract class): If we want to use the ForkJoin framework, we must first create a ForkJoin task. It provides mechanisms to perform fork() and join() operations within tasks. Normally we do not need to directly inherit the ForkJoinTask class, we only need to inherit its subclasses. The Fork/Join framework provides the following two subclasses:

    1. RecursiveAction (abstract class): used for tasks that do not return results
    2. RecursiveTask (abstract class): used for tasks that return results
  2. ForkJoinPool: ForkJoinTask needs to be executed through ForkJoinPool

The subtasks split from the task will be added to the double-ended queue maintained by the current worker thread and entered at the head of the queue. When there are no tasks in the queue of a worker thread, it will randomly obtain a thread from the tail of the queue of other worker threads.

public class CountTask extends RecursiveTask<Integer> {<!-- -->
private int start;
private int end;

public CountTask(int start, int end) {<!-- -->
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {<!-- -->
int sum = 0;
// Calculate the task if the task is small enough
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {<!-- -->
for (int i = start; i <= end; i + + ) {<!-- -->
sum + = i;
}
} else {<!-- -->
// If the task is larger than the threshold, it is split into two subtasks for calculation.
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//Execute subtask
leftTask.fork();
rightTask.fork();
// Wait for the subtask to finish executing and get its results
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// merge subtasks
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {<!-- -->
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(1, 4);
//Execute a task
Future<Integer> result = forkJoinPool.submit(task);
try {<!-- -->
System.out.println(result.get());
} catch (InterruptedException e) {<!-- -->
} catch (ExecutionException e) {<!-- -->
}
}
}

RecursiveTask needs to implement the compute method. In this method, it is first necessary to determine whether the task is small enough. If it is small enough, execute the task directly. If it is not small enough, it must be divided into two subtasks, and each subtask will enter the compute method when the fork method is called. Finally, use the join method to wait for the subtask execution to complete and get its results.

ForkJoinTask may throw an exception during execution, but we cannot catch the exception directly in the main thread, so ForkJoinTask provides the isCompletedAbnormally() method to check whether the task has thrown an exception or been canceled, and can pass ForkJoinTask’s getException method to get the exception.

The getException method returns a Throwable object. If the task is canceled, it returns CancellationException. If the task is not completed or no exception is thrown, it returns null.

3. Execution principle

ForkJoinPool consists of the ForkJoinTask array and the ForkJoinWorkerThread array. The ForkJoinTask array is responsible for storing the tasks submitted to ForkJoinPool by the program, and the ForkJoinWorkerThread array is responsible for executing these tasks.

When we call the fork method of ForkJoinTask, the program will call the pushTask method of ForkJoinWorkerThread to perform the task asynchronously, and then return the result immediately.

The pushTask method stores the current task in the ForkJoinTask array queue, and then calls ForkJoinPool’s signalWork to repeatedly wake up or create a worker thread to execute the task.

5. ScheduledThreadPool class

ScheduledThreadPoolExecutor is mainly used to run tasks after a given delay, or to execute tasks regularly. ScheduledThreadPoolExecutor uses the task queue DelayQueue to encapsulate a PriorityQueue. PriorityQueue will sort the tasks in the queue, and the tasks with the shortest execution time will be executed first (the time variable of the ScheduledFutureTask is executed first), If the execution time is the same, the task submitted first will be executed first (the smaller sequenceNumber variable of ScheduledFutureTask is executed first).

1. ScheduledExecutorService

The ScheduledThreadPool class inherits from ThreadPoolExecutor and implements the ScheduledExecutorService interface.

The ScheduledExecutorService interface defines several methods:

  1. ScheduleFuture schedule(Runnable command, long delay, TimeUnit unit): Submit a one-time task that is enabled after a given delay
  2. ScheduleFuture schedule(Callable command, long delay, TimeUnit unit): Submit a one-time task with a return value that is enabled after the given delay
  3. ScheduleFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): Submit a periodic task, and time the task when it starts (if the task execution time is too long or even exceeds the period time, the task will be executed continuously)
  4. ScheduleFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): Submit a periodic task, and the timing will be started after the task execution is completed.

ScheduledFuture inherits from the Delayed interface and the Future interface, and does not define any new methods.

The Delayed interface is a mixed-style interface used to mark objects that should be executed after a given delay. This interface defines a getDelay method to return the remaining delay time. In addition, this interface inherits the Comparable interface, which means that the implementation of this interface must define a compareTo method, which provides a sorting consistent with its getDelay method.

2. Compare Timer

  • Timer is sensitive to changes in the system clock, ScheduledThreadPoolExecutor is not
  • Timer has only one thread of execution, so long-running tasks can delay other tasks. ScheduledThreadPoolExecutor can be configured with any number of threads. Additionally, you can have full control over the created threads if you want (by providing a ThreadFactory)
  • A runtime exception thrown in a TimerTask will kill a thread, causing the Timer to freeze, that is, the scheduled task will no longer run. ScheduledThreadExecutor not only catches runtime exceptions, but also allows you to handle them when needed (by overriding the afterExecute method of ThreadPoolExecutor). The task that throws the exception will be canceled, but other tasks will continue to run

6. Executors class

Executors is a factory class used to create thread pools in Java. It provides a series of static factory methods for creating different types of thread pools. These factory methods hide the complexity of the thread pool and make the creation of the thread pool very simple. The thread pools provided by the Executors factory class have the following types:

  • newCachedThreadPool(): The corePoolSize of CachedThreadPool is set to 0, and the maximumPoolSize is set to Integer.MAX.VALUE, that is, it is unbounded, which means that if the speed of the main thread submitting tasks is higher than the speed of thread processing tasks in the maximumPool , CachedThreadPool will continuously create new threads. In extreme cases, this can lead to exhaustion of cpu and memory resources
  • newFixedThreadPool(int nThreads): Creates a fixed-size thread pool containing the specified number of threads. The number of threads is fixed and will not be automatically expanded, that is, there is no emergency thread.
  • newSingleThreadExecutor(): Create a single-threaded thread pool. This thread pool contains only one thread for serial execution of tasks. Suitable for scenarios where tasks need to be performed in sequence
  • newScheduledThreadPool(int corePoolSize): Create a fixed-size thread pool for scheduled task execution. The number of threads is fixed and will not expand automatically. Suitable for scenarios where scheduled tasks are executed
  • newSingleThreadScheduledExecutor(): Create a single-threaded scheduled execution thread pool. Contains only one thread for serial and scheduled execution of tasks
  • newWorkStealingPool(int parallelism): This thread pool maintains enough threads to support a given level of parallelism and can use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively participating or available to participate in task processing. The actual number of threads can be increased and decreased dynamically. The work-stealing pool does not guarantee the execution order of submitted tasks

In addition, methods such as creating ThreadFactory instances and converting Runnable instances into Callable instances are also provided.