The CompletionException in CompletableFuture is really bugging me!

Development environment

  • JDK 17
  • Idea 2022

Students who are familiar with the JDK 8 version have probably used the java.util.concurrent.CompletableFuture class. Sometimes you may need to perform certain operations in parallel in business services, which is indispensable its existence.

I have a lot of demand scenarios in my business, for example: I need to pull the order and order details from the channel side to our system. At this time, two interfaces are used: Order List Query, Order Details Query, the query list is very simple, page-by-page query, because it only returns Order number, let me give you an example!

Here is the data structure (Mock data) returned by the order list query interface, a bunch of order numbers

{<!-- -->
    "data":[
        "125345345345",
        "235894563423",
        "345345345343"
    ]
}

Next, I will definitely check the order details according to the order number. Don’t subconsciously perform the for loop to check one by one. When will I find out! At this time, CompletableFuture comes in handy.

CompletableFuture application

The prerequisite is that for the convenience of use, I encapsulated a general implementation directly in the Base package of the business. Of course, it is only suitable for our business, and students who need it can pick it up.

Note: The following code does not handle real exceptions. If you want to use it directly, read on! ! !

/**
 * Be lazy, the thread pool is directly written like this first, this is not how it is done in real business!
 */
private final static ExecutorService executorService = Executors. newFixedThreadPool(4);


/**
 * Create parallel tasks and execute them
 *
 * @param list data source
 * @param api API call logic
 * @param exceptionHandle exception handling logic
 * @param <S> data source type
 * @param <T> program return type
 * @return processing result list
 */
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {<!-- -->
    //Regularize all tasks
    List<CompletableFuture<T>> collectFuture = list. stream()
            .map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(e, s)))
            .toList();
    // Summarize all tasks and execute join, and return uniformly after all executions are completed
    return CompletableFuture. allOf(collectFuture. toArray(new CompletableFuture<?>[]{<!-- -->}))
            .thenApply(f -> collectFuture. stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
            .join();
}

/**
 * Create a single CompletableFuture task
 *
 * @param logic task logic
 * @param exceptionHandle exception handling
 * @param <T> type
 * @return task
 */
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {<!-- -->
    return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}

Using the above encapsulated code is completely applicable to most of my parallel business scenarios in the company, and it has indeed improved the CPU utilization of my Pod node.

But then there is a problem. When you call the external API, it will fail occasionally. If you fail, you have to retry. At this time, I need to correctly judge the exception and retry the operation.

First define a business exception class

public static class BizApiException extends RuntimeException {<!-- -->
    public BizApiException() {<!-- -->
    }

    public BizApiException(String message) {<!-- -->
        super(message);
    }
}

Example code

The code below is just to simulate my scenario on the business side, so everyone can enjoy it.

public static void main(String[] args) {<!-- -->
    CompletableFutureDemo f = new CompletableFutureDemo();
    List<Integer> numList = f.parallelFutureJoin(Arrays.asList(1, 2, 3), num -> {<!-- -->
        //Simulate API call
        try {<!-- -->
            Thread. sleep(1000);
        } catch (InterruptedException e) {<!-- -->
            //...
        }
        if (num > 2) {<!-- -->
            throw new BizApiException("Don't be too big");
        }
        return num;
    }, (e, num) -> {<!-- -->
        // The exception opens the door to you
        if (e instanceof BizApiException) {<!-- -->
            System.out.println("Business exception, I am dealing with numbers: " + num + ", exception reason: " + e);
            return -1;
        }
        System.out.println("I am abnormal, old six, I was dealing with numbers just now: " + num + ", the reason for the exception: " + e);
        return -1;
    });
    System.out.println(numList);
}

Note: I originally wanted to use the returned Exception to judge whether it was a BizApiException business exception. Unfortunately, the exception type here will never be BizApiException. I output the following content to Console, got it!

After executing the code, the console content I get

At this time, you can actually see the real exception type obtained, and a sixth child named java.util.concurrent.CompletionException appeared. I see what it is!

I am abnormal, old six, I was dealing with the number just now: 3, the reason for the exception: java.util.concurrent.CompletionException: com.java.basic.CompletableFutureDemo$BizApiException: Don’t be too careful
[1, 2, -1]

CompletionException

The annotation of this class explains its purpose. The simple understanding is that it will be triggered when an error or other exception is encountered in the process of completing the result or task. Then I get it! It will appear after the task is abnormal, and then wrap our own exception!

JDK source code

/**
 * Exception thrown when an error or other exception is encountered
 * in the course of completing a result or task.
 *
 * @since 1.8
 * @author Doug Lea
 */
public class CompletionException extends RuntimeException {<!-- -->}

In fact, there is another exception class that we also need to pay attention to. It tells us where the real exception is!

ExecutionException

The comments of this class are very clear, and all the exceptions are in getCause(), so it is easy to handle.

/**
 * Exception thrown when attempting to retrieve the result of a task
 * that aborted by throwing an exception. This exception can be
 * inspected using the {@link #getCause()} method.
 *
 * @see Future
 * @since 1.5
 * @author Doug Lea
 */
public class ExecutionException extends Exception {<!-- -->}

Reform my concurrency tool class (full version)

The method extractRealException is the real exception I want to get. At the same time, if it is referenced in the parallelFutureJoin method, this tool class solves the demand.

/**
 * Create parallel tasks and execute them
 *
 * @param list data source
 * @param api API call logic
 * @param exceptionHandle exception handling logic
 * @param <S> data source type
 * @param <T> program return type
 * @return processing result list
 */
public <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> api, BiFunction<Throwable, S, T> exceptionHandle) {<!-- -->
    //Regularize all tasks
    List<CompletableFuture<T>> collectFuture = list. stream()
            .map(s -> this.createFuture(() -> api.apply(s), e -> exceptionHandle.apply(
this. extractRealException(e), s)))
            .toList();
    // Summarize all tasks and execute join, and return uniformly after all executions are completed
    return CompletableFuture. allOf(collectFuture. toArray(new CompletableFuture<?>[]{<!-- -->}))
            .thenApply(f -> collectFuture. stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
            .join();
}

/**
 * Create CompletableFuture task
 *
 * @param logic task logic
 * @param exceptionHandle exception handling
 * @param <T> type
 * @return task
 */
public <T> CompletableFuture<T> createFuture(Supplier<T> logic, Function<Throwable, T> exceptionHandle) {<!-- -->
    return CompletableFuture.supplyAsync(logic, executorService).exceptionally(exceptionHandle);
}

/**
 * Extract the real exception
 * <p>
 * What CompletableFuture throws is often not a real exception
 *
 * @param throwable exception
 * @return the real exception
 */
public Throwable extractRealException(Throwable throwable) {<!-- -->
    if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {<!-- -->
        if (throwable. getCause() != null) {<!-- -->
            return throwable. getCause();
        }
    }
    return throwable;
}

Of course, this is just a simple version of the parallel task tool class, there are more possibilities, you need to explore by yourself!

References

  • docs.oracle.com/javase/8/do…