19. ForkJoin application examples

1. Preface

In the previous section, we learned the basic concepts and core API of ForkJoin. This section will lead you to implement a specific application case. Experience the use of the ForkJoin framework from practical applications and the convenience brought by this framework.

This section first describes the content of the case to be implemented, then does the coding implementation, and then summarizes the precautions during use.

2. Case description

In actual projects, we often encounter the processing of large data collections, such as large files, large tables, and large data collections in memory. Due to the large amount of data, we often consider using multi-threading to improve processing efficiency.
When the data processing logic of each part of the data set to be processed is basically the same and can be easily divided into small data sets for processing, it is more appropriate to use ForkJoin for processing.

We still use the scenario in the Executor application example: it is necessary to encrypt all files (hundreds or thousands) in a certain directory and modify the file name with the file’s MD5 string.

Before starting to implement it, let’s do a simple analysis. In this case, we use “encrypt files, generate MD5 strings, and modify file names” as the content of the tasks to be executed. The list of all files is the range of data we need to process. In order to verify whether any files are missing during the entire processing, we ultimately need to check the processing results, so we use the FileForkJoinTask class (inherited from RecursiveTask) to encapsulate our task logic. For the convenience of demonstration, some of the data in the coding below are generated by simulation.

3. Coding implementation

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

public class ForkJoinTest {<!-- -->

    //Simulate the list of files to be processed
    private static int fileListSize = new Random().nextInt(15);
    private static String[] fileList = new String[fileListSize];
    static {<!-- -->
        for(int i=0; i<fileListSize; i + + ) {<!-- -->
            fileList[i] = "fileName" + i;
        }
    }

    // main thread
    public static void main(String[] args) throws Exception {<!-- -->
        //Create a thread pool for processing tasks
        // ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); This creation method can maximize the use of global system resources
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //Submit the total pending tasks
        Future result = forkJoinPool.submit(new FileDealTask(0, fileListSize, fileList));
        // Get task execution results
        System.out.println("Number of files to be processed" + fileListSize + ", total number of files processed:" + result.get());
        // Close the thread pool
        forkJoinPool.shutdown();
    }
}

The above code comments are very clear. Let’s observe the following code to see how the tasks are divided and how the results of the subtasks are summarized.

import lombok.SneakyThrows;
import java.util.Random;
import java.util.concurrent.RecursiveTask;

public class FileDealTask extends RecursiveTask<Integer> {<!-- -->

    private String[] fileList;
    // When the sub-task is divided into only processing up to 10 files, stop splitting the task
    private final int threshold = 2;
    private int first;
    private int last;

    public FileDealTask(int first, int last, String[] fileList) {<!-- -->
        this.fileList = fileList;
        this.first = first;
        this.last = last;
    }

    @SneakyThrows
    @Override
    protected Integer compute() {<!-- -->
        // Results of the
        int result = 0;

        // If the task is small enough, process it directly (encrypt the file, generate MD5 string, modify the file name)
        if (last - first <= threshold) {<!-- -->
            for (int i = first; i < last; i + + ) {<!-- -->
                result = result + 1;
                Thread.sleep(new Random().nextInt(2000));
                System.out.println(Thread.currentThread().getName() + ":File" + fileList[i] + "Processed");
            }
            System.out.println(Thread.currentThread().getName() + ": the total number of files processed (" + first + "," + last + ")" + result);
        } else {<!-- -->
            // Split into small tasks
            int middle = first + (last - first) / 2;
            //Create two subtasks
            FileDealTask leftTask = new FileDealTask(first, middle, fileList);
            FileDealTask rightTask = new FileDealTask(middle, last, fileList);
            // Trigger two subtasks to start execution
            invokeAll(leftTask, rightTask);
            // Wait for the execution results of the two subtasks and return
            result = leftTask.join() + rightTask.join();
            System.out.println(Thread.currentThread().getName() + ": The current task continues to be split "
                     + " (" + first + "," + middle + "), (" + (middle) + "," + last + ")");
        }
        return result;
    }
}

Let’s run the above example in the IDE to see the actual results.

There is random content in the logic of the above code, and the results will be different each time. When we run the above code, we observe the results of a certain operation as follows:

ForkJoinPool-1-worker-2: File fileName3 has been processed
ForkJoinPool-1-worker-2: Total number of files processed (3,4)1
ForkJoinPool-1-worker-1: File fileName0 has been processed
ForkJoinPool-1-worker-1: Total number of files processed (0,1)1
ForkJoinPool-1-worker-0: File fileName4 has been processed
ForkJoinPool-1-worker-3: File fileName1 has been processed
ForkJoinPool-1-worker-3: File fileName2 has been processed
ForkJoinPool-1-worker-3: Total number of files processed (1,3)2
ForkJoinPool-1-worker-1: The current task continues to be split (0,1), (1,3)
ForkJoinPool-1-worker-0: File fileName5 has been processed
ForkJoinPool-1-worker-0: Total number of files processed (4,6)2
ForkJoinPool-1-worker-2: The current task continues to be split (3,4), (4,6)
ForkJoinPool-1-worker-1: The current task continues to be split (0,3), (3,6)
The number of files to be processed is 6, the total number of files to be processed: 6

First we did (0,3)~(3,6), then we split (0,3) into (0,1), (1,3), and we did (3,6) for (3,6). 4), (4,6)
of splitting. Consistent with our expectations.

4. Notes

ForkJoinPool is not intended to replace ExecutorService. It is mainly used to implement the “divide and conquer” algorithm, which is most suitable for processing computationally intensive tasks.
Make an evaluation and trade-off. When the amount of data to be processed is not particularly large, there is no need to use ForkJoin. The bottom layer uses multi-threading to process tasks, which involves thread context switching. When the amount of data is not large, using serial will be faster than using multi-threading.
When executing subtasks, please note that when using invokeAll, you cannot call fork on subtasks separately.