Learn CompletableFuture: Keep your code from blocking!

This is a community that may be useful to you

One-to-one communication/interview brochure/resume optimization/job search questions, welcome to join the “Yudao Rapid Development Platform” Knowledge Planet. The following is some information provided by Planet:

  • “Project Practice (Video)”: Learn from books, “practice” from past events

  • “Internet High Frequency Interview Questions”: Studying with your resume, spring blossoms

  • “Architecture x System Design”: Overcoming difficulties and mastering high-frequency interview scenario questions

  • “Advancing Java Learning Guide”: systematic learning, the mainstream technology stack of the Internet

  • “Must-read Java Source Code Column”: Know what it is and why it is so

81502ed52151f61d61cb81807afb4d30.gif

This is an open source project that may be useful to you

Domestic Star is a 100,000+ open source project. The front-end includes management backend + WeChat applet, and the back-end supports monomer and microservice architecture.

Functions cover RBAC permissions, SaaS multi-tenancy, data permissions, mall, payment, workflow, large-screen reports, WeChat public account, etc.:

  • Boot address: https://gitee.com/zhijiantianya/ruoyi-vue-pro

  • Cloud address: https://gitee.com/zhijiantianya/yudao-cloud

  • Video tutorial: https://doc.iocoder.cn

Source: juejin.cn/post/
6844904024332828685

  • write in front

  • Scene description

  • CompletableFuture usage

  • Synchronous method Pick and asynchronous method query the price of a certain product in all stores

    • Why CompletableFuture is still needed

    • Other API introduction

    • Application scenarios of CompletableFuture

    • Optimize space

3111e65f1bea5a9309a2da2f23f88c2d.jpeg

Write in front

By reading this article you will learn:

  • Use of CompletableFuture

  • CompletableFure asynchronous and synchronous performance testing

  • Why do we still need to introduce CompletableFuture in JDK1.8 when we already have Future?

  • Application scenarios of CompletableFuture

  • Optimizing the use of CompletableFuture

Backend management system + user applet implemented based on Spring Boot + MyBatis Plus + Vue & Element, supporting RBAC dynamic permissions, multi-tenancy, data permissions, workflow, three-party login, payment, SMS, mall and other functions

  • Project address: https://github.com/YunaiV/ruoyi-vue-pro

  • Video tutorial: https://doc.iocoder.cn/video/

Scene description

Query the price of a certain product in all stores and return it. The API for querying the price of a certain product in the store is synchronized. A Shop class provides a synchronized method named getPrice.

  • Shop class: Shop.java

public class Shop {
    private Random random = new Random();
    /**
     * Find price based on product name
     * */
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    /**
     * Calculate the price
     *
     * @param product
     * @return
     * */
    private double calculatePrice(String product) {
        delay();
        //random.nextDouble() returns the discount randomly
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    /**
     * Simulate other time-consuming operations through sleep
     * */
    private void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Querying the price of goods is a synchronous method, and other operations are simulated through the sleep method. This scenario simulates when a third-party API needs to be called, but the third-party provides a synchronous API. When the third-party API cannot be modified, how to design code calls to improve the performance and throughput of the application. In this case, the CompletableFuture class can be used

Backend management system + user applet implemented based on Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element, supporting RBAC dynamic permissions, multi-tenancy, data permissions, workflow, three-party login, payment, SMS, mall and other functions

  • Project address: https://github.com/YunaiV/yudao-cloud

  • Video tutorial: https://doc.iocoder.cn/video/

CompletableFuture usage

Completable is the implementation class of Future interface, introduced in JDK1.8

  • Creation of CompletableFuture:

    illustrate:

    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    public static <U> CompletableFuture<U> completedFuture(U value) {
        return new CompletableFuture<U>((value == null) ? NIL : value);
    }

    The value of the parameter is the result of the task execution. Generally, this method is rarely used in practical applications.

    //Method 1
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    //Method Two
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    //Method 1
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    //Method Two
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }
    • Created using the CompletableFuture#runAsync static method runAsync has two overloaded methods

    • Created using the CompletableFuture#supplyAsync static method supplyAsync has two overloaded methods:

    • Created using the CompletableFuture#completedFuture static method

    • The difference between the two overloaded methods => The latter can pass in a custom Executor, the former is the default and uses ForkJoinPool

    • The difference between supplyAsync and runAsync methods => The former has a return value and the latter has no return value

    • Supplier is a functional interface, so this method needs to pass in the implementation class of the interface. If you trace the source code, you will find that the method of the interface will be called in the run method. Therefore, using this method to create a CompletableFuture object only requires overriding the get method in Supplier and defining the task in the get method. And because functional interfaces can use Lambda expressions, the code will be concise compared to creating a CompletableFuture object with new.

    • Use new method

  • Getting results: The CompltableFuture class provides four ways to get results.

    //Method 1
    public T get()
    //Method 2
    public T get(long timeout, TimeUnit unit)
    //Method 3
    public T getNow(T valueIfAbsent)
    //Method 4
    public T join()

    illustrate:

    Example:

    • get() and get(long timeout, TimeUnit unit) => are already provided in Future. The latter provides timeout processing. If the result is not obtained within the specified time, a timeout exception will be thrown.

    • getNow => Get the result immediately without blocking. If the result calculation is completed, the result or an exception during the calculation process will be returned. If the calculation is not completed, the set valueIfAbsent value will be returned.

    • No exception will be thrown in the join => method

public class AcquireResultTest {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      //getNow method test
      CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
          try {
              Thread.sleep(60 * 1000 * 60 );
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
  
          return "hello world";
      });
  
      System.out.println(cp1.getNow("hello h2t"));
  
      //join method test
      CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((()-> 1 / 0));
      System.out.println(cp2.join());
  
      //get method test
      CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((()-> 1 / 0));
      System.out.println(cp3.get());
  }
}

illustrate:

  • The first execution result is hello h2t, because you have to sleep for 1 minute first and the result cannot be obtained immediately.

  • The join method will not throw an exception in the result method, but the execution result will throw an exception. The exception thrown is CompletionException.

  • The get method will throw an exception in the result method. The exception thrown by the execution result is ExecutionException.

  • Exception handling: CompletableFuture objects created using static methods do not need to explicitly handle exceptions. Objects created using new need to call the completeExceptionally method to set the captured exceptions. For example:

CompletableFuture completableFuture = new CompletableFuture();
new Thread(() -> {
   try {
       //doSomething, call the complete method to record the execution results of other methods in the completableFuture object
       completableFuture.complete(null);
   } catch (Exception e) {
       //Exception handling
       completableFuture.completeExceptionally(e);
    }
}).start();

Synchronous method Pick asynchronous method to query the price of a product in all stores

The store is a list:

private static List<Shop> shopList = Arrays.asList(
        new Shop("BestPrice"),
        new Shop("LetsSaveBig"),
        new Shop("MyFavoriteShop"),
        new Shop("BuyItAll")
);

Sync method:

private static List<String> findPriceSync(String product) {
    return shopList.stream()
            .map(shop -> String.format("%s price is %.2f",
                    shop.getName(), shop.getPrice(product))) //Format conversion
            .collect(Collectors.toList());
}

Asynchronous methods:

private static List<String> findPriceAsync(String product) {
    List<CompletableFuture<String>> completableFutureList = shopList.stream()
            //Convert to asynchronous execution
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> String.format("%s price is %.2f",
                            shop.getName(), shop.getPrice(product)))) //Format conversion
            .collect(Collectors.toList());

    return completableFutureList.stream()
            .map(CompletableFuture::join) //Getting the result will not throw an exception
            .collect(Collectors.toList());
}

Performance test results:

Find Price Sync Done in 4141
Find Price Async Done in 1033

Asynchronous execution efficiency is increased four times

Why is CompletableFuture still needed

Before JDK1.8, tasks could be run asynchronously by calling the submit method of the thread pool. This method would return a Future object, and the result of asynchronous execution could be obtained by calling the get method:

private static List<String> findPriceFutureAsync(String product) {
    ExecutorService es = Executors.newCachedThreadPool();
    List<Future<String>> futureList = shopList.stream().map(shop -> es.submit(() -> String.format("%s price is %.2f",
            shop.getName(), shop.getPrice(product)))).collect(Collectors.toList());

    return futureList.stream()
            .map(f -> {
                String result = null;
                try {
                    result = f.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

                return result;
            }).collect(Collectors.toList());
}

Why do we still need to introduce CompletableFuture when there is good in it? There is no need to use Future for simple business scenarios, but if you want to combine the calculation results of multiple asynchronous tasks, and the calculation results of the latter asynchronous task require the value of the previous asynchronous task, etc., you can use the API provided by Future. It is shy and not elegant enough to handle. At this time, it is better to let CompletableFuture handle these requirements elegantly in a declarative way. Moreover, in Future programming, if you want to get the value of Future and then use this value to do subsequent calculation tasks, you can only judge whether the task is completed through polling. This takes up a lot of CPU and the code is not elegant. It is expressed in pseudo code as follows :

while(future.isDone()) {
    result = future.get();
    doSomrthingWithResult(result);
}

But CompletableFuture provides an API to help us achieve such needs

Other API introduction

Processing of whenComplete calculation results:

The previous calculation results are processed and new values cannot be returned. Three methods are provided:

//Method 1
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
//Method Two
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
//Method 3
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

illustrate:

  • BiFunction fn parameter => defines the processing of the result

  • Executor executor parameters => Custom thread pool

  • Methods ending with async will perform the combined operation in a new thread

Example:

public class WhenCompleteTest {
    public static void main(String[] args) {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello");
        CompletableFuture<String> cf2 = cf1.whenComplete((v, e) ->
                System.out.println(String.format("value:%s, exception:%s", v, e)));
        System.out.println(cf2.join());
    }
}
thenApply conversion:

Pass the CompletableFuture of the previous calculation result to thenApply and return the result processed by thenApply. It can be considered that the conversion from CompletableFuture to CompletableFuture is achieved through the thenApply method. In plain English, the calculation result of CompletableFuture is used as the parameter of thenApply method, and the result processed by thenApply method is returned. Three methods are provided:

//Method 1
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

//Method Two
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}

//Method 3
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

illustrate:

  • Function fn parameter => Conversion operation on the previous CompletableFuture calculation result

  • Executor executor parameters => Custom thread pool

  • Methods ending with async will perform the combined operation in a new thread. Example:

public class ThenApplyTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenApplyTest::randomInteger).thenApply((i) -> i * 8);
        System.out.println(result.get());
    }

    public static Integer randomInteger() {
        return 10;
    }
}

Here, the result calculated by the previous CompletableFuture is expanded eight times.

thenAccept result processing:

thenApply can also be classified as result processing. The difference between thenAccept and thenApply is that there is no return value and three methods are provided:

//Method 1
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

//Method Two
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

//Method 3
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}

illustrate:

  • Consumer action parameter => Operation on the calculation result of the previous CompletableFuture

  • Executor executor parameters => Custom thread pool

  • Similarly, methods ending with async will perform combined operations in a new thread. Example:

public class ThenAcceptTest {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(ThenAcceptTest::getList).thenAccept(strList -> strList.stream()
                .forEach(m -> System.out.println(m)));
    }

    public static List<String> getList() {
        return Arrays.asList("a", "b", "c");
    }
}

Print the result calculated by the previous CompletableFuture

thenCompose asynchronous result pipeline:

The thenCompose method can pipeline two asynchronous operations and provides three methods:

//Method 1
public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

//Method Two
public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(asyncPool, fn);
}

//Method 3
public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn,
    Executor executor) {
    return uniComposeStage(screenExecutor(executor), fn);
}

illustrate:

  • Function> fnParameter => Execution of the current CompletableFuture calculation result

  • Executor executor parameters => Custom thread pool

  • Similarly, methods ending with async will perform combined operations in a new thread. Example:

public class ThenComposeTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenComposeTest::getInteger)
                .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 10));
        System.out.println(result.get());
    }

    private static int getInteger() {
        return 666;
    }

    private static int expandValue(int num) {
        return num * 10;
    }
}

Execution flow chart:

ca1e39bd0283c571d7055d59a5fb3cd0.jpeg

picture
thenCombine combination result:

The thenCombine method combines two unrelated CompletableFutures. The second Completable does not depend on the result of the first Completable. Three methods are provided:

//Method 1
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}
  //Method Two
  public <U,V> CompletableFuture<V> thenCombineAsync(
      CompletionStage<? extends U> other,
      BiFunction<? super T,? super U,? extends V> fn) {
      return biApplyStage(asyncPool, other, fn);
  }

  //Method 3
  public <U,V> CompletableFuture<V> thenCombineAsync(
      CompletionStage<? extends U> other,
      BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
      return biApplyStage(screenExecutor(executor), other, fn);
  }

illustrate:

  • CompletionStage other parameters => calculation result of new CompletableFuture

  • BiFunction fn parameter => defines two CompletableFuture objects After completing the calculation How to merge the results, this parameter is a functional interface, so Lambda can be used expression

  • Executor executor parameters => Custom thread pool

  • Similarly, methods ending with async will perform the combined operation in a new thread

Example:

public class ThenCombineTest {
    private static Random random = new Random();
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenCombineTest::randomInteger).thenCombine(
                CompletableFuture.supplyAsync(ThenCombineTest::randomInteger), (i, j) -> i * j
        );

        System.out.println(result.get());
    }

    public static Integer randomInteger() {
        return random.nextInt(100);
    }
}

Multiply the values calculated by the two threads and return the execution flow chart:

937c3192dff9faee4eaf56ff72b215ce.jpeg

picture
allOf & amp;anyOf combine multiple CompletableFutures:

Method introduction:

//allOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}
//anyOf
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

illustrate:

  • allOf => Calculation is performed after all CompletableFutures are executed.

  • anyOf => The calculation will be performed after any CompletableFuture is executed.

Example:

  • allOf method test

public class AllOfTest {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
          System.out.println("hello");
          return null;
      });
      CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
          System.out.println("world"); return null;
      });
      CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2);
      System.out.println(result.get());
  }
}

The allOf method has no return value and is suitable for application scenarios that have no return value and require all previous tasks to be completed before subsequent tasks can be executed.

  • anyOf method test

public class AnyOfTest {
  private static Random random = new Random();
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
          randomSleep();
          System.out.println("hello");
          return "hello";});
      CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
          randomSleep();
          System.out.println("world");
          return "world";
      });
      CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
      System.out.println(result.get());
 }
  
  private static void randomSleep() {
      try {
          Thread.sleep(random.nextInt(10));
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}

Both threads will print out the results, but the get method will only return the result that completes the task first. This method is more suitable for application scenarios where other tasks can be continued as long as there is a return value.

Notes

Many methods provide asynchronous implementation [with async suffix], but these asynchronous methods need to be used with caution, because asynchronous means there is context switching, and the performance may not necessarily be better than synchronization. If you need to use an asynchronous method, Do a test first and let the test data speak for itself! ! !

Application scenarios of CompletableFuture

For IO-intensive tasks, you can choose CompletableFuture, and the IO part is executed by another thread. The implementation principle of Logback and Log4j2 asynchronous logging is to create a new thread to perform IO operations. This part can be called by CompletableFuture.runAsync(()->{ioOperation();}). If it is CPU-intensive, it is not recommended. It is recommended to use parallel streaming.

Optimize space

The underlying implementation of supplyAsync execution tasks:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

The bottom layer calls the thread pool to perform tasks, and the default thread pool in CompletableFuture is ForkJoinPool.

private static final Executor asyncPool = useCommonPool?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

The size of the ForkJoinPool thread pool depends on the number of CPU cores. For CPU-intensive tasks, the thread pool size can be configured as the number of CPU cores, but for IO-intensive tasks, the size of the thread pool is determined by CPU number * CPU utilization * (1 + thread waiting time / thread CPU time) OK. The application scenario of CompletableFuture is IO-intensive tasks, so the default ForkJoinPool generally cannot achieve the best performance. We need to create a thread pool ourselves according to the business.

Welcome to join my knowledge planet and comprehensively improve your technical capabilities.

To join, Long press” or “Scan” the QR code below:

de8825a3338bff9aa963a3c13c5c7d62.png

Planet’s content includes: project practice, interviews and recruitment, source code analysis, and learning routes.

66e1a60969d4cb6b36437a7be78ef8f3.png

dfc9220cddc7cd9bbac6f7545981e8aa.pngddca27d2a080ec962f84b00dde1076b3.png 26bad7aeedc5fde1819427a4e7f06785.pngae05fd0aad6a51b44880cce 5cec79631.png

If the article is helpful, please read it and forward it.
Thank you for your support (*^__^*)
syntaxbug.com © 2021 All Rights Reserved.