JUC concurrent programming – ForkJoin and asynchronous callbacks

ForkJoin (branch merge)

What is ForkJoin

ForkJoin appeared in JDK1.7, executing tasks in parallel, which can improve efficiency under large data volume

An explanation provided by iFlytek Spark:

Forkjoin is a parallel computing algorithm that is used to decompose a large task into multiple small tasks, then assign these small tasks to different threads or processes for parallel execution, and finally merge the results.

In computer science, Forkjoin is often used to implement programs and data structures based on the divide-and-conquer strategy, such as sorting algorithms, graph traversal algorithms, hash tables, etc. It can effectively utilize the parallel computing capabilities of multi-core processors to improve program performance.

ForkJoin features

  • job theft

ForkJoin decomposes a large task into several small tasks for parallel operations. Suppose there are two threads, thread A and thread B. After thread B finishes executing its small task, it is found that thread A has not finished executing thread A’s small task. task, thenB thread can steal A thread’s small task execution, which can speed up task execution efficiency.

At the same time, it should be noted that the task storage structure assigned to thread A and thread B is a double-ended queue, so thread B can steal the task of thread A from the other end without causing exceptions or errors caused by thread A and thread B executing the same task. .

External link image transfer The save failed. The source site may have an anti-leeching mechanism. It is recommended to save the image and upload it directly

ForkJoin operation

An example is to calculate the superposition of 1 to 1 billion. In this example, three different methods are used to calculate, and the results and calculation time are printed. Encapsulate ForkJoin into ForkJoinDemo class

ForkJoinDemo class:

package ForkJoin;

import java.util.concurrent.RecursiveTask;

/**
 *
 * The task of summation calculation!
 * How to use forkJoin
 * 1. Execute through forkJoinPool
 * 2. Calculation task forkJoinPool.execute(ForKJoinTask<?> task)
 * 3. The calculation class must inherit ForkJoinTask
 */
public class ForkJoinDemo extends RecursiveTask<Long> {<!-- -->
    private Long start;
    private Long end;

    // critical value
    private Long temp = 1000L;

    public ForkJoinDemo(Long start,Long end){<!-- -->
        this.start = start;
        this.end = end;
    }

    // Calculation method The only abstract method in the RecursiveTask interface
    @Override
    protected Long compute() {<!-- -->
        if ((end-start)>temp){<!-- -->
            Long sum = 0L;
            for (Long i = start; i <= end; i + + ) {<!-- -->
                sum + = i;
            }
            return sum;
        }else{<!-- -->
            // forkJoin is very similar to recursion in thinking
            // Median
            long middle = (start + end)/2;
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork();//Split the task and push the task into the thread queue
            ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, start);
            task2.fork();//Split the task and push the task into the thread queue
            return task1.join() + task2.join();
        }
    }
}

Test class: This class uses three methods to calculate the sum, ordinary for loop, ForkJoin and Stream

package ForkJoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Test {<!-- -->
    public static void main(String[] args) throws ExecutionException, InterruptedException {<!-- -->
        test1();
        test2();
        test3();
    }

    //Basic calculation method
    public static void test1(){<!-- -->
        Long sum = 0l;
        long start = System.currentTimeMillis();
        for (Long i = 1L; i <= 10_0000_0000; i + + ) {<!-- -->
            sum + = i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum=" + sum + "time" + (end-start));
    }

    // forkJoin
    public static void test2() throws ExecutionException, InterruptedException {<!-- -->
        long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L);
        // forkJoinPool.execute(); has no return value, so it is not used
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long sum = submit.get();
        long end = System.currentTimeMillis();
        System.out.println("sum=" + sum + "time" + (end-start));
    }

    // Stream parallel stream
    public static void test3(){<!-- -->
        long start = System.currentTimeMillis();
        long sum = LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum=" + sum + "time" + (end-start));
    }
}

The final result is as follows:

External link picture transfer The save failed. The source site may have an anti-leeching mechanism. It is recommended to save the image and upload it directly

It can be seen thatStream flow calculation is the fastest! You must have a good grasp of Stream flow, and although it seems that ForkJoin is not much different from for loop, because the operation of ForkJoin is very similar to recursion, there is more room for operation. If the baseline Choosing it well may produce unexpected results.

Asynchronous callback

package future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 *
 * Asynchronous call: Ajax
 */
public class Demo01 {<!-- -->
    public static void main(String[] args) throws ExecutionException, InterruptedException {<!-- -->
// // runAsync asynchronous callback without return value
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{<!-- -->
// try {<!-- -->
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {<!-- -->
// throw new RuntimeException(e);
// }
// System.out.println(Thread.currentThread().getName() + "runAsync=>void");
// });
//
// System.out.println("1111111");
// completableFuture.get(); // Get execution results

        // Asynchronous callback with return value
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {<!-- -->
            System.out.println(Thread.currentThread().getName() + "completableFuture=>void");
           // int i = 10/0; // Intentional execution error
            return 1024;
        });
        System.out.println(completableFuture.whenComplete((t, u) -> {<!-- -->
            System.out.println("t---->" + t);//Normal return result
            System.out.println("u---->" + u); // Error message
        }).exceptionally((e) -> {<!-- -->
            System.out.println(e.getMessage());
            return 404;
        }).get());
    }
}