11Fork/Join

Table of Contents

1 Divide and conquer thinking

2Fork/Join

2.1 Introduction

2.2 Application scenarios

1 Recursive decomposition task

2 Array processing

3 Parallelization Algorithm

4 Big data processing

2.3 Use

2.3.1 ForkJoinPool

Constructor

Task submission method

Compared with ordinary thread pool

2.3.2 ForkJoinTask

call method

2.3.3 Handling recursive tasks

2.3.4 Handling blocking tasks

2.4 Principles

2.4.1 Worker thread

2.4.2 Job theft


1 The idea of divide and conquer

Divide and conquer idea: A problem of size N is decomposed into K sub-problems of size. The sub-problems are independent of each other and have the same properties as the original problem. If you find the solution to the sub-problem, you can get the solution to the original problem.

Steps to divide and conquer thinking:

break down

Solve

merge

2 Fork/Join

2.1 Introduction

Parallel computing framework, used to support the divide-and-conquer task model. Fork corresponds to task decomposition in the divide-and-conquer task model, and Join corresponds to result merging.

2.2 Application Scenario

1 Recursive decomposition task

Sorting, merging, traversing, etc., usually large tasks can be decomposed into several sub-tasks

2 array processing

For sorting, searching, statistics, etc. of a large array, split it into several sub-arrays, process each sub-array in parallel, and finally merge it into a large ordered array.

3 Parallelization Algorithm

Parallel image processing algorithms, parallel machine learning algorithms, etc., split the task into several sub-problems

4 Big Data Processing

Large log file processing, large database query, etc., divide the data into several shards and process each shard in parallel

2.3 Use

Main components: ForkJoinPool, ForkJoinTask

ForkJoinPool: used to manage the execution of tasks

ForkJoinTask: tasks can be divided into smaller

Steps for usage:

1 To build a task, you need to inherit RecursiveAction (no return value) or RecursiveTask (with return value), rewrite the compute() method to implement the execution logic of the task, and finally call invokeAll in the method to start executing the task.

2 Build a forkJoin thread pool and call forkJoin.invoke() to submit the task

2.3.1 ForkJoinPool

Threads used to manage Fork/Join tasks

Constructor

int parallelism: Specifies the parallelism level and determines the number of worker threads. If not set, use Runtime.getRuntime().availableProcessors() to set the parallelism level.

ForkJoinWorkerThreadFactory: When creating a thread, it is created through this factory. If it is not set, the default DefaultForkJoinWorkerThreadFactory is used to be responsible for the creation of the thread.

UncaughtExceptionHandler: Specify the exception handler. When luck goes wrong, it will be handled by the set handler.

asyncMode: The working mode of the queue, true=first in first out, false=last in first out

Task submission method

Comparison with ordinary thread pool

Work stealing algorithm: Ordinary thread pools are implemented using task queues; after threads in FockJoinPool have completed executing tasks, they can obtain tasks from the queues of other threads and execute them.

Decomposition and merging of tasks: ForkJoinPool can decompose a large task into multiple small tasks, execute these small tasks in parallel, and finally merge the results; while ordinary threads can only execute them one by one in the order of submitted tasks.

Number of worker threads: ForkJoinPool automatically sets the number of worker threads based on the number of CPU cores in the current system to maximize the CPU performance advantage; ordinary threads need to manually set the thread pool size, and its rationality must be considered

Task type: ForkJoinPool is suitable for performing large-scale task parallelization; ordinary thread pool is suitable for performing some short tasks, such as processing requests

2.3.2 ForkJoinTask

Define the basic interface for performing tasks

Implement your own task class by inheriting the ForkJoinTask class, and rewrite the compute() method to define the execution logic of the task. When implementing, you only need to inherit its subclass:

RecursiveAction: recursively executes but does not need to return a result

RecursiveTask: The results that need to be returned for recursive execution

CountedCompleter: Custom hook function triggered after the task completes execution.

Calling method

fork() —- Submit task

Submit the task to the thread pool where the current task is running; the current thread is of type ForkJoinWorkerThread and will be put into the work queue of this thread, otherwise it will be put into the work queue of the common thread pool.

join() —- Get task execution results

Used to obtain the execution result of the task; when calling the method, the current thread will be blocked until the corresponding subtask completes running and returns the result.

2.3.3 Processing recursive tasks

public class Fibonacci extends RecursiveTask<Integer> {

    final int n;

    public Fibonacci(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 2);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 1);
        return f2.compute() + f1.join();
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        Fibonacci fibonacci = new Fibonacci(100);
        Integer result = pool.invoke(fibonacci);
        System.out.println(result);
    }
}

Problems with the above code: it will lead to long program running time, stack overflow when the recursion depth is too large, etc.

When using ForkJoinPool to process recursive tasks, special consideration should be given to recursion depth and task granularity to avoid memory consumption caused by scheduling.

2.3.4 Handling blocking tasks

1 Prevent thread starvation: When a thread is executing a blocking task, it will wait for the task to be completed. If no other thread can steal the task, the thread will remain blocked; in ForkJoinPool, avoid submitting a large number of blocking tasks (use this thread The purpose of the pool is to divide and conquer, and a large number of blocking tasks will reduce the use value and performance of ForkJoinPool)

2 Use a specific thread pool: ThreadPoolExecutor, use it as the executor of ForkJoinPool, let it perform blocking tasks; ForkJoinPool performs non-blocking tasks

3 Do not block the working thread: If you must submit a blocking task, make sure the task does not block the working thread, otherwise it will cause the performance of the entire thread pool to degrade. To avoid this situation, you need to submit the blocking task to a new thread pool, or use Asynchronous programming tools such as CompletableFuture to handle blocking tasks

public class BlockingTaskDemo {

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5);
                return "xxx";
            } catch (InterruptedException e) {
                return null;
            }
        }, pool);

        try {
            String result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            pool.shutdown();
        }
    }
}

2.4 Principle

There are multiple task queues inside ForkJoinPool (one work thread corresponds to one work queue). When the invoke and submit methods are called to submit a task, the task will be submitted to the task queue according to certain routing rules. Subtasks are created during task execution. Then Submit subtasks to the task queue as well

Even if the thread task queue is empty, it can still obtain the tasks of other threads and execute them through the “Task Stealing” mechanism

2.4.1 Worker Thread

ForkJoinWorkerThread is a thread specifically used for execution in ForkJoinPool. When this thread is created, it will register a WorkQueue to ForkJoinPool, which is specifically used to store its own tasks Queue; stored in odd bits of WorkQueue[]

The WorkQueue[] array is used to store the WorkQueue of all threads; the even-numbered positions store tasks submitted by external threads (this is why CompletableFuture can be placed in the ForkJoinPool thread pool, and the WorkQueue[] in the even-numbered positions saves the general logic of the thread pool)

In ForkJoinPool, only the odd-numbered bits of WorkQueue[] belong to ForkJoinWorkerThread, and the stealing task is also stolen at this position.

2.4.2 Work theft

Allow idle threads to get tasks from odd bits of WorkQueue[]

push, pop: current thread operation; poll: steal the WorkQueue of other threads