Concurrent programming-thread pool ForkJoinPool (2)

Fork/Join framework introduction

What is Fork/Join

Fork/Join is a parallel computing framework, mainly used to support the divide and conquer task model.

Fork corresponds to task decomposition in the divide-and-conquer task model, and Join corresponds to result merging.

The core idea: Divide a large task into many small tasks, then execute these small tasks in parallel, and finally merge their results into one large result.

Application scenarios

1. Recursive decomposition tasks

This type of task can usually decompose a large task into several subtasks. Each subtask can be executed independently, and the subtasks can be merged to obtain ordered results.

Examples: sorting, merging, traversing

2. Array processing

When processing large arrays, you can decompose the large array into several sub-arrays, process the sub-arrays in parallel, and finally merge the results of the sub-arrays

Example: sorting, statistics, and search of arrays

3. Parallelization algorithm

Break the problem into several sub-problems, solve each sub-problem in parallel, and finally merge the sub-problems to get the final solution

Examples: Parallel image processing algorithms, parallel machine learning algorithms

4. Big data processing

Divide the data into several shards, process each shard in parallel, and finally merge the processed shards into a complete result

Examples: Large log file processing, large database query

Fork/Join usage

The main components of the Fork/Join framework are ForkJoinPool and ForkJoinTask. ForkJoinPool is a thread pool that manages the execution of ForkJoin tasks. ForkJoinTask is an abstract class used to represent tasks that can be divided into smaller parts.

ForkJoinPool

ForkJoinPool is the thread pool class in the Fork/Join framework, which is used to manage the threads of the Fork/Join task.

Methods: submit(), invoke(), shutdown(), awaitTermination(), etc.

(Submit tasks, execute tasks, close the thread pool, and wait for task execution results)

Parameters: Thread pool size, worker thread priority, task queue capacity, etc., set according to specific application scenarios

Constructor

There are four core parameters in ForkJoinPool, which are used to control the number of parallel thread pools, creation of worker threads, exception handling, and mode specification.

  • int parallelism: Specifies the parallelism level. ForkJoinPool will determine the number of worker threads based on this setting. If not set, Runtime.getRuntime().availableProcessors() will be used to set the parallelism level;

  • ForkJoinWorkerThreadFactory factory: When ForkJoinPool creates a thread, it will be created through the factory. Note that what needs to be implemented here is ForkJoinWorkerThreadFactory, not ThreadFactory. If factory is not specified, the default DefaultForkJoinWorkerThreadFactory will be responsible for thread creation;

  • UncaughtExceptionHandler handler: Specify the exception handler. When an error occurs during task operation, it will be handled by the set handler;

  • boolean asyncMode: Set the working mode of the queue. When asyncMode is true, the first-in-first-out queue will be used, and when it is false, the last-in-first-out mode will be used. The default is false, last in first out

//Get the number of processors, note that this is a logical core
int processors = Runtime.getRuntime().availableProcessors();
//Build forkjoin thread pool
ForkJoinPool forkJoinPool = new ForkJoinPool(processors);

public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

Task submission method (one of the core competencies)

Return value Method
Submit asynchronous execution void execute(ForkJoinTask task) execute(Runnable task)
Wait and get the results T invoke(ForkJoinTask task)
Submit execution to obtain Future results ForkJoinTask submit(ForkJoinTask task) submit(Callable task) submit(Runnable task) submit(Runnable task, T result)

Differences between ordinary thread pools

  • Work Stealing Algorithm

ForkJoinPool uses a work-stealing algorithm to improve thread utilization, while ordinary thread pools use task queues to manage tasks.

Work stealing: After a thread completes its own task, it can obtain a task from the queue of other threads to execute, improving thread utilization.

  • Decomposition and merging of tasks

ForkJoinPool can decompose a large task into multiple small tasks, execute these small tasks in parallel, and finally combine their results to get the final result. The ordinary thread pool can only execute tasks one by one in the order of submitted tasks.

  • Number of worker threads

ForkJoinPool will automatically set the number of worker threads based on the number of CPU cores in the current system to maximize the performance advantages of the CPU. An ordinary thread pool requires manual setting of the thread pool size. If the setting is unreasonable, it may result in too many or too few threads, thus affecting the performance of the program.

  • Task type

ForkJoinPool is suitable for performing large-scale task parallelization, while ordinary thread pools are suitable for performing some short tasks, such as processing requests.

ForkJoinTask

ForkJoinTask is an abstract class in the Fork/Join framework, which defines the basic interface for executing tasks. Users can implement their own task class by inheriting the ForkJoinTask class, and override the compute() method to define the task execution logic. Normally we do not need to directly inherit the ForkJoinTask class, but only need to inherit its subclasses. The Fork/Join framework provides the following three subclasses:

  • RecursiveAction: Used for tasks that are performed recursively but do not need to return results.

  • RecursiveTask: Used to recursively execute tasks that need to return results.

  • CountedCompleter: After the task completes execution, a custom hook function will be triggered to execute.

Call method

  • fork()–Submit task

The fork() method is used to submit tasks to the thread pool where the current task is running. If the current thread is of type ForkJoinWorkerThread, it will be put into the work queue of the thread, otherwise it will be put into the work queue of the common thread pool.

  • join()–Get task execution results

The join() method is used to obtain the execution results of the task. When join() is called, the current thread will be blocked until the corresponding subtask completes running and returns the result.

Calculating Fibonacci Sequences (handling recursive tasks)

The Fibonacci sequence refers to a sequence of numbers: 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89… This sequence starts from the 3rd item, and each item is equal to the previous one. The sum of two items.

public class FibonacciDemo extends RecursiveTask<Integer>
{
    final int n;
    
    FibonacciDemo(int n)
    {
        this.n = n;
    }
    
    /**
     * Override the compute() method of RecursiveTask
     */
    protectedInteger compute()
    {
        if (n <= 1)
            return n;
        FibonacciDemo f1 = new FibonacciDemo(n - 1);
        //Submit task
        f1.fork();
        FibonacciDemo f2 = new FibonacciDemo(n - 2);
        // merge results
        return f2.compute() + f1.join();
    }
    
    public static void main(String[] args)
    {
        //Build forkjoin thread pool
        ForkJoinPool pool = new ForkJoinPool();
        FibonacciDemo task = new FibonacciDemo(100000); // Large parameter, throws StackOverflowError
        //Submit the task and block until the task execution is completed and return the merged results.
        int result = pool.invoke(task);
        System.out.println(result);
    }
}
How to solve stack overflow
//Use iteration to prevent stack overflow
public class FibonacciDemo2
{
    public static void main(String[] args)
    {
        int n = 100000;
        long[] fib = new long[n + 1];
        fib[0] = 0;
        fib[1] = 1;
        for (int i = 2; i <= n; i + + )
        {
            fib[i] = fib[i - 1] + fib[i - 2];
        }
        System.out.println(fib[n]);
    }
}
Notes on handling recursive tasks

When using the Fork/Join framework to process recursive tasks, it is necessary to evaluate the recursion depth and task granularity based on the actual situation to avoid problems with task scheduling and memory consumption. If the recursion depth is large, you can try other methods to optimize the algorithm, such as using iteration instead of recursion, or limiting the recursion depth to reduce the number of tasks to avoid the shortcomings of the Fork/Join framework.

When the recursion depth is large, subtasks may be scheduled to different threads for execution, and thread creation, destruction, and task scheduling take up a lot of resources. In addition, when calling methods recursively, a large number of method stack frames are created, which may cause stack memory to overflow StackOverflowError.

Handling blocking tasks

1. Prevent thread starvation: When a thread is executing a blocking task, it will wait for the task to be completed. If there is no task stealing, it may remain blocked. To prevent this from happening, you should avoid submitting a large number of blocking tasks in ForkJoinPool.

2. Use a specific thread pool: In order to maximize the performance of ForkJoinPool, you can use a dedicated thread pool to handle blocking tasks. These threads will not be affected by the stealing mechanism of ForkJoinPool. For example, you can use ThreadPoolExecutor to create a thread pool, and then use this thread pool as the executor of ForkJoinPool, so that you can use ThreadPoolExecutor to handle blocking tasks and ForkJoinPool to handle non-blocking tasks.

3. Don’t block worker threads: If you use blocking tasks in ForkJoinPool, you need to ensure that these tasks do not block worker threads, otherwise the performance of the entire thread pool will decrease. To avoid this situation, you can submit blocking tasks to a dedicated thread pool, or use asynchronous programming tools such as CompletableFuture to handle blocking tasks.

// Example of using CompletableFuture
public class BlockingTaskDemo
{
    public static void main(String[] args)
    {
        // Build a forkjoin thread pool
        ForkJoinPool pool = new ForkJoinPool();
        
        //Create an asynchronous task and submit it to ForkJoinPool for execution
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try
            {
                // Simulate a time-consuming task
                TimeUnit.SECONDS.sleep(5);
                return "Hello, world!";
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
                return null;
            }
        }, pool);
        
        try
        {
            // Wait for the task to complete and get the results
            String result = future.get();
            
            System.out.println(result);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
        finally
        {
            //Close ForkJoinPool and release resources
            pool.shutdown();
        }
    }
}
How ForkJoinPool works

There are multiple task queues inside ForkJoinPool. When we submit a task through the invoke() or submit() method of ForkJoinPool, ForkJoinPool submits the task to a task queue according to certain routing rules. If the task is executed, a child will be created. task, then the subtask will be submitted to the task queue corresponding to the worker thread.

In addition, ForkJoinPool supports a mechanism called “task stealing”. If a worker thread is idle, it can “steal” tasks in other worker task queues. Take full advantage of your CPU’s performance.

The work task queue is a double-ended linked list. Stealing is from the base side, and the top side is used to fetch tasks normally.

Worker thread ForkJoinWorkerThread

ForkJoinWorkerThread is a thread in ForkJoinPool specifically used to perform tasks.

When a ForkJoinWorkerThread is created, it automatically registers a WorkQueue to the ForkJoinPool. This WorkQueue is a queue specially used by this thread to store its own tasks, and can only appear in the odd bits of WorkQueues[]. In ForkJoinPool, WorkQueues[] is an array used to store the WorkQueue of all threads.

WorkQueue

WorkQueue is a double-ended queue used to store the tasks of the worker thread itself. Each worker thread maintains a local WorkQueue and executes tasks in the local queue first. When the tasks in the local queue are executed, the worker thread will try to steal tasks from the WorkQueue of other threads.

Note: In ForkJoinPool, only the WorkQueue with odd digits in WorkQueues[] belongs to the ForkJoinWorkerThread thread, so only these WorkQueue can be used and steal tasks by the thread itself. The even-numbered WorkQueue is used for external threads to submit tasks and is shared by multiple threads, so they cannot be stolen by threads.

Job theft

Work stealing allows idle threads to steal tasks from the double-ended queue of busy threads. By default, a worker thread gets tasks from the head of its own deque. However, when its own task is empty, the thread gets the task from the tail of the deque of other busy threads. This approach minimizes the possibility of threads competing for tasks.

Most operations of ForkJoinPool occur in work-stealing queues, which are implemented by the internal class WorkQueue. It is a special form of Deques, but only supports three operations: push, pop and poll (also called stealing). In ForkJoinPool, queue reading has strict constraints. push and pop can only be called from the thread to which they belong, while poll can be called from other threads.

Through work stealing, the Fork/Join framework can realize automatic load balancing of tasks to fully utilize the computing power of multi-core CPUs, while also avoiding thread starvation and delay problems.

If you are interested in the detailed implementation details of ForkJoinPool, you can also refer to Doug Lea’s paper

Summary

Fork/Join is a model based on the divide and conquer idea, which has significant advantages in concurrent processing of computing tasks.

  • Task segmentation: Divide large tasks into smaller tasks with smaller granularity, allowing more threads to participate in execution;

  • Task stealing: Make full use of idle threads and reduce competition through task stealing.

When using ForkJoinPool, you need to pay special attention to whether the task type is a pure function calculation type, that is, these tasks should not care about changes in status or external world. This is the safest approach. If it is a blocking type of task, then you need to carefully evaluate the technical solutions. Although ForkJoinPool can also handle blocking type tasks, it may incur complex management costs.

When dealing with blocking tasks, focus on learning CompletableFuture, which includes various methods for arranging tasks.