ForkJoin (branch merge)
What is ForkJoin
ForkJoin appeared in JDK1.7, executing tasks in parallel, which can improve efficiency under large data volume
An explanation provided by iFlytek Spark:
Forkjoin is a parallel computing algorithm that is used to decompose a large task into multiple small tasks, then assign these small tasks to different threads or processes for parallel execution, and finally merge the results.
In computer science, Forkjoin is often used to implement programs and data structures based on the divide-and-conquer strategy, such as sorting algorithms, graph traversal algorithms, hash tables, etc. It can effectively utilize the parallel computing capabilities of multi-core processors to improve program performance.
ForkJoin features
- job theft
ForkJoin decomposes a large task into several small tasks for parallel operations. Suppose there are two threads, thread A and thread B. After thread B finishes executing its small task, it is found that thread A has not finished executing thread A’s small task. task, thenB thread can steal A thread’s small task execution, which can speed up task execution efficiency.
At the same time, it should be noted that the task storage structure assigned to thread A and thread B is a double-ended queue, so thread B can steal the task of thread A from the other end without causing exceptions or errors caused by thread A and thread B executing the same task. .
ForkJoin operation
An example is to calculate the superposition of 1 to 1 billion. In this example, three different methods are used to calculate, and the results and calculation time are printed. Encapsulate ForkJoin into ForkJoinDemo class
ForkJoinDemo class:
package ForkJoin; import java.util.concurrent.RecursiveTask; /** * * The task of summation calculation! * How to use forkJoin * 1. Execute through forkJoinPool * 2. Calculation task forkJoinPool.execute(ForKJoinTask<?> task) * 3. The calculation class must inherit ForkJoinTask */ public class ForkJoinDemo extends RecursiveTask<Long> {<!-- --> private Long start; private Long end; // critical value private Long temp = 1000L; public ForkJoinDemo(Long start,Long end){<!-- --> this.start = start; this.end = end; } // Calculation method The only abstract method in the RecursiveTask interface @Override protected Long compute() {<!-- --> if ((end-start)>temp){<!-- --> Long sum = 0L; for (Long i = start; i <= end; i + + ) {<!-- --> sum + = i; } return sum; }else{<!-- --> // forkJoin is very similar to recursion in thinking // Median long middle = (start + end)/2; ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork();//Split the task and push the task into the thread queue ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, start); task2.fork();//Split the task and push the task into the thread queue return task1.join() + task2.join(); } } }
Test class: This class uses three methods to calculate the sum, ordinary for loop, ForkJoin and Stream
package ForkJoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; public class Test {<!-- --> public static void main(String[] args) throws ExecutionException, InterruptedException {<!-- --> test1(); test2(); test3(); } //Basic calculation method public static void test1(){<!-- --> Long sum = 0l; long start = System.currentTimeMillis(); for (Long i = 1L; i <= 10_0000_0000; i + + ) {<!-- --> sum + = i; } long end = System.currentTimeMillis(); System.out.println("sum=" + sum + "time" + (end-start)); } // forkJoin public static void test2() throws ExecutionException, InterruptedException {<!-- --> long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L); // forkJoinPool.execute(); has no return value, so it is not used ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum=" + sum + "time" + (end-start)); } // Stream parallel stream public static void test3(){<!-- --> long start = System.currentTimeMillis(); long sum = LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum); long end = System.currentTimeMillis(); System.out.println("sum=" + sum + "time" + (end-start)); } }
The final result is as follows:
It can be seen thatStream flow calculation is the fastest! You must have a good grasp of Stream flow, and although it seems that ForkJoin is not much different from for loop, because the operation of ForkJoin is very similar to recursion, there is more room for operation. If the baseline Choosing it well may produce unexpected results.
Asynchronous callback
package future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * * Asynchronous call: Ajax */ public class Demo01 {<!-- --> public static void main(String[] args) throws ExecutionException, InterruptedException {<!-- --> // // runAsync asynchronous callback without return value // CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{<!-- --> // try {<!-- --> // TimeUnit.SECONDS.sleep(2); // } catch (InterruptedException e) {<!-- --> // throw new RuntimeException(e); // } // System.out.println(Thread.currentThread().getName() + "runAsync=>void"); // }); // // System.out.println("1111111"); // completableFuture.get(); // Get execution results // Asynchronous callback with return value CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {<!-- --> System.out.println(Thread.currentThread().getName() + "completableFuture=>void"); // int i = 10/0; // Intentional execution error return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> {<!-- --> System.out.println("t---->" + t);//Normal return result System.out.println("u---->" + u); // Error message }).exceptionally((e) -> {<!-- --> System.out.println(e.getMessage()); return 404; }).get()); } }