Concurrent programming-thread pool ForkJoinPool

ForkJoinPool

Algorithm question: How to make full use of the performance of multi-core CPUs to quickly sort an array of 20 million sizes?

Divide and conquer thinking: decompose, solve, merge

The idea of divide and conquer is to decompose a problem of size N into K sub-problems of smaller size, which are independent of each other and have the same properties as the original problem. By finding the solution to the subproblem, you can get the solution to the original problem.

The steps of divide and conquer thinking are as follows:

1. Decomposition: Divide the problem to be solved into several smaller similar problems; (sub-problems cannot be infinitely small, so thresholds are usually set)

2. Solution: When the sub-problems are divided into small enough parts, use simpler methods to solve them;

3. Merge: According to the requirements of the original problem, the solutions to the sub-problems are merged layer by layer to form the solution to the original problem.

Among the top ten classic computer algorithms, merge sort, quick sort, and binary search are all algorithms based on the divide and conquer idea.

Merge Sort Demonstration: Comparison Sorting Visualization

Extension: Arrays.sort() Very efficient algorithm, quick sorting, and optimized

/**
 * Sorts the specified array into ascending numerical order.
 *
 * <p>Implementation note: The sorting algorithm is a Dual-Pivot Quicksort
 * by Vladimir Yaroslavskiy, Jon Bentley, and Joshua Bloch. This algorithm
 * offers O(n log(n)) performance on many data sets that cause other
 * quicksorts to degrade to quadratic performance, and is typically
 * faster than traditional (one-pivot) Quicksort implementations.
 *
 * @param a the array to be sorted
 */
public static void sort(int[] a) {
DualPivotQuicksort.sort(a, 0, a.length - 1, null, 0, 0);
}

/**
 * Sorts the specified array into ascending numerical order.
 *
 * @implNote The sorting algorithm is a parallel sort-merge that breaks the
 * array into sub-arrays that are themselves sorted and then merged. When
 * the sub-array length reaches a minimum granularity, the sub-array is
 * sorted using the appropriate {@link Arrays#sort(byte[]) Arrays.sort}
 * method. If the length of the specified array is less than the minimum
 * granularity, then it is sorted using the appropriate {@link
 * Arrays#sort(byte[]) Arrays.sort} method. The algorithm requires a
 * working space no greater than the size of the original array. The
 * {@link ForkJoinPool#commonPool() ForkJoin common pool} is used to
 * execute any parallel tasks.
 *
 * @param a the array to be sorted
 *
 * @since 1.8
 */
public static void parallelSort(byte[] a) {
int n = a.length, p, g;
if (n <= MIN_ARRAY_SORT_GRAN ||
(p = ForkJoinPool.getCommonPoolParallelism()) == 1)
DualPivotQuicksort.sort(a, 0, n - 1);
else
new ArraysParallelSortHelpers.FJByte.Sorter
(null, a, new byte[n], 0, n, 0,
((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
MIN_ARRAY_SORT_GRAN : g).invoke();
}

Implemented based on merge sort algorithm

For quick sorting of an array of size 20 million, an efficient merge sort algorithm can be used.

What is merge sort

Merge Sort is a sorting algorithm based on the divide and conquer idea. The basic idea of merge sort is to divide a large array into two sub-arrays of equal size, sort each sub-array separately, and then merge the two sub-arrays into a large ordered array. Because recursive implementation is often used (determined by the nature of splitting first and then merging), we call it merge sort.

The steps of merge sort include:

  • Split an array into two subarrays

  • Sort each subarray

  • Merge two ordered subarrays (still ordered after merging, guaranteed by double pointer sorting)

The time complexity of merge sort is O(nlogn) and the space complexity is O(n), where n is the length of the array.

Use merge sort to implement the above algorithm problem

Single-thread implementation of merge sort

Implementation of single-threaded merge algorithm: divide the sequence into two parts, perform recursive sorting respectively, and then merge the sorted subsequences.

public class MergeSort
{
    /** Array to be sorted */
    private final int[] arrayToSort;
    /** The threshold for splitting, below which no splitting will be performed */
    private final int threshold;
    
    public MergeSort(final int[] arrayToSort, final int threshold)
    {
        this.arrayToSort = arrayToSort;
        this.threshold = threshold;
    }
    
    /**
     * Sort
     */
    public int[] mergeSort()
    {
        return mergeSort(arrayToSort, threshold);
    }
    
    public static int[] mergeSort(final int[] arrayToSort, int threshold)
    {
        // If the length of the split array is less than the threshold, sort directly
        if (arrayToSort. length <threshold)
        {
            //Call the sorting method provided by jdk
            Arrays.sort(arrayToSort);
            return arrayToSort;
        }
        
        int midpoint = arrayToSort.length / 2;
        // Split the array
        int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
        int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
        // recursive call
        leftArray = mergeSort(leftArray, threshold);
        rightArray = mergeSort(rightArray, threshold);
        //Merge sort results
        return merge(leftArray, rightArray);
    }
    
    public static int[] merge(final int[] leftArray, final int[] rightArray)
    {
        //Define the array used to merge the results
        int[] mergedArray = new int[leftArray.length + rightArray.length];
        int mergedArrayPos = 0;
        //Use double pointers to compare two numbers
        int leftArrayPos = 0;
        int rightArrayPos = 0;
        while (leftArrayPos < leftArray.length & amp; & amp; rightArrayPos < rightArray.length)
        {
            if (leftArray[leftArrayPos] <= rightArray[rightArrayPos])
            {
                mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
                leftArrayPos + + ;
            }
            else
            {
                mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
                rightArrayPos + + ;
            }
            mergedArrayPos + + ;
        }
        
        while (leftArrayPos < leftArray.length)
        {
            mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
            leftArrayPos + + ;
            mergedArrayPos + + ;
        }
        
        while (rightArrayPos < rightArray.length)
        {
            mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
            rightArrayPos + + ;
            mergedArrayPos + + ;
        }
        
        return mergedArray;
    }
    
    public static void main(String[] args)
    {
        // Generate test array for merge sorting
        int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);
        // Get the number of processors
        int processors = Runtime.getRuntime().availableProcessors();

        MergeSort mergeSort = new MergeSort(arrayToSortByMergeSort, processors);
        long startTime = System.nanoTime();
        // merge sort
        mergeSort.mergeSort();
        long duration = System.nanoTime() - startTime;
        System.out.println("Single-thread merge sort time: " + (duration / (1000f * 1000f)) + "milliseconds");
    }
}

/**
 * Randomly generate array tool class
 */
public class Utils
{
    public static int[] buildRandomIntArray(final int size)
    {
        int[] arrayToCalculateSumOf = new int[size];
        Random generator = new Random();
        for (int i = 0; i < arrayToCalculateSumOf.length; i + + )
        {
            arrayToCalculateSumOf[i] = generator.nextInt(100000000);
        }
        return arrayToCalculateSumOf;
    }
}
Fork/Join parallel merge sort

Parallel merge sort is a merge sort algorithm implemented using multi-threads. The basic idea: divide the data into several parts, then merge and sort these parts on different threads, and finally merge the sorted parts into an ordered array. On multi-core CPUs, this algorithm can also effectively improve the sorting speed.

You can use Java’s Fork/Join framework to implement parallelization of merge sort:

/**
 * Use fork-join to implement array sorting
 */
public class MergeSortTask extends RecursiveAction
{
    /** The threshold for splitting, below which no splitting will be performed */
    private final int threshold;
    /** Array to be sorted */
    private int[] arrayToSort;
    
    public MergeSortTask(final int[] arrayToSort, final int threshold)
    {
        this.arrayToSort = arrayToSort;
        this.threshold = threshold;
    }
    
    @Override
    protected void compute()
    {
        // If the length of the split array is less than the threshold, sort directly
        if (arrayToSort.length <= threshold)
        {
            //Call the sorting method provided by jdk
            Arrays.sort(arrayToSort);
            return;
        }
        
        // Split the array
        int midpoint = arrayToSort.length / 2;
        int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
        int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
        
        MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);
        MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);

        // Option 1: Call the task and block the current thread until all subtasks are completed.
        invokeAll(leftTask, rightTask);

        // Option 2: Including submitting task fork and merging results join
        // leftTask.fork();
        // rightTask.fork();
        // leftTask.join();
        // rightTask.join();
        // Note: Option 1 and 2 can be replaced. The advantage of Option 1 is that it can make full use of the multi-core capabilities of the CPU.
        
        //Merge sort results
        arrayToSort = MergeSort.merge(leftTask.getSortedArray(), rightTask.getSortedArray());
    }
    
    public int[] getSortedArray()
    {
        return arrayToSort;
    }

    public static void main(String[] args)
    {
        // Generate test array for merge sorting
        int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);
        // Generate test array for forkjoin sorting
        int[] arrayToSortByForkJoin = Arrays.copyOf(arrayToSortByMergeSort, arrayToSortByMergeSort.length);
        // Get the number of processors
        int processors = Runtime.getRuntime().availableProcessors();

        // Sort using forkjoin
        MergeSortTask mergeSortTask = new MergeSortTask(arrayToSortByForkJoin, processors);
        //Build forkjoin thread pool
        ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
        long startTime = System.nanoTime();
        //Perform sorting tasks
        forkJoinPool.invoke(mergeSortTask);
        long duration = System.nanoTime() - startTime;
        System.out.println("forkjoin sorting time: " + (duration / (1000f * 1000f)) + "milliseconds");
    }
}

Compare the output results:

Single-thread merge sort time: 3411.657 milliseconds
forkjoin sorting time: 1744.5936 milliseconds

Summary: The larger the array, the more efficient the parallel merge sort implemented by the Fork/Join framework is than the single-thread merge sort.

Optimization and considerations for parallel implementation of merge sort
  • Task size: The choice of task size affects the efficiency and load balancing of parallel algorithms. If the task is too small, the overhead of task division and merging will be too high; if the task is too large, the task will not be able to fully utilize the parallel processing capabilities of the multi-core CPU. Therefore, in practical applications, it is necessary to select an appropriate task size based on factors such as data volume and number of CPU cores.

  • Load balancing: Parallel algorithms need to ensure load balancing. The size and time of tasks performed by each thread should be as equal as possible, otherwise it will lead to a situation where some threads are overloaded and other threads are underloaded. In merge sort, load balancing can be achieved through recursive calls, but it should be noted that the number of recursive levels cannot be too deep, otherwise the overhead of task division and merging will be too high.

  • Data distribution: The uniformity of data distribution will also affect the efficiency and load balancing of parallel algorithms. In merge sort, if the data distribution is uneven, some threads will process too much data, while other threads will process too little data. Therefore, in practical applications, it is necessary to consider the distribution of data and divide the data into subarrays of equal size as much as possible.

  • Memory usage: Parallel algorithms need to consider memory usage. Especially when processing large-scale data, memory usage will have an important impact on the execution efficiency of the algorithm. In merge sort, memory can be saved by merging data in place, but attention needs to be paid to how the merge is implemented to avoid problems such as data overwriting and unstable sorting.

  • Thread switching: Thread switching is an important overhead of parallel algorithms. It is necessary to minimize the number of thread switches to improve the execution efficiency of the algorithm. In merge sort, the number of threads and switching overhead can be controlled by setting the size of the thread pool and adjusting the task size to achieve optimal performance of the algorithm.

Fork/Join framework introduction