CompletableFutureBatch asynchronous task processing

When we encounter many businesses being processed at the same time in the project, if it is serial, it will definitely affect the performance. At this time, asynchronous execution is needed. When it comes to asynchronous execution, there must be many solutions.

Option 1:

For example, using spring’s asynchronous annotations, such as the following code, each method has asynchronous annotations. At that time, this solution had many shortcomings. The first was that its display creation thread could not achieve thread reuse, and the second was that it could not handle exceptions and tasks uniformly. Has the execution been completed?

class void test(){
  test01();
  test02();
  test03();
}

Option 2: Use thread pool

CountDownLatch usage

Thread pool can realize thread reuse and is definitely the best choice for asynchronous execution.
Using a CountDownLatch: You can decrement the CountDownLatch as each task completes, and then the main thread waits for the CountDownLatch to reach zero to determine that all tasks have completed. This requires some additional programming effort but allows more flexible control

int numberOfTasks = 10;
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch countDownLatch = new CountDownLatch(numberOfTasks);

for (int i = 0; i < numberOfTasks; i + + ) {
    executorService.submit(() -> {
        //Execute task
        countDownLatch.countDown();
    });
}

try {
    countDownLatch.await();
    System.out.println("All tasks have been executed");
} catch (InterruptedException e) {
    System.err.println("Waiting was interrupted");
}
executorService.shutdown();

awaitTermination

Use the awaitTermination method: The ExecutorService interface provides the awaitTermination method, which allows you to wait for a period of time to check whether the tasks in the thread pool have been executed. For example:

ExecutorService executorService = Executors.newFixedThreadPool(5);
// Submit the task to the thread pool

executorService.shutdown(); // Stop accepting new tasks
try {
    if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
        System.out.println("All tasks have been executed");
    } else {
        System.out.println("Waiting timeout, there are still tasks that have not been completed");
    }
} catch (InterruptedException e) {
    System.err.println("awaitTermination was interrupted");
}

invokeAll

ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Callable<Void>> tasks = new ArrayList<>();
//Add tasks to tasks

List<Future<Void>> futures = executorService.invokeAll(tasks);
for (Future<Void> future : futures) {
    if (!future.isDone()) {
        System.out.println("There are still tasks that have not been completed");
        break;
    }
}
executorService.shutdown();

Option 3

Since the thread pool is used, can it be further optimized? Use java8. After all, the popular asynchronous programming CompletableFuture
for example:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        Here you can store the task results in a collection or an object
            //Execute task 1
            System.out.println("Task 1 is running...");
        });

        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            //Execute task 2
            Here you can store the task results in a collection or an object
            System.out.println("Task 2 is running...");
        });

        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            //Execute task 3
            Here you can store the task results in a collection or an object
            System.out.println("Task 3 is running...");
        });

        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);

        try {
            allOf.get(); // Wait for all tasks to complete
            System.out.println("All tasks are completed.");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

But this way of writing is not elegant enough, because each asynchronous task needs to store the results, which can be optimized.

 private List<Object> asyncHandleTask(Map<String, TaskHandlerService> handleServiceMap, Map<String, TaskResultParserDTO> taskResultParserMap) {
        //Submit task
        List<CompletableFuture<List<Object>>> completableFutureList = handleServiceMap.entrySet().stream()
                .map(entry -> CompletableFuture.supplyAsync(() -> entry.getValue().handle(taskResultParserMap.get(entry.getKey())), threadPoolTaskExecutor))
                .collect(Collectors.toList());

        //Wait for all tasks to complete
        CompletableFuture<Void> allTaskFeatureList = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));
        //Wait for all tasks to complete
        or
        CompletableFuture<Void>[] futures = new CompletableFuture[]{future1, future2, future3};
        CompletableFuture<Void> allOf = CompletableFuture.allOf(futures);
        //Wait for all tasks to complete
        or
        CompletableFuture<Void> allTaskFeatureList = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size]));

        //Get the result data of all tasks
        CompletableFuture<List<List<Object>>> listCompletableFuture = allTaskFeatureList.thenApply(v -> completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
        List<List<Object>> resultList;

        try {
            resultList = listCompletableFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("Failed to process task result data, {}", ExceptionUtils.getStackTrace(e));
            throw new CustomException(TaskProcessStatusEnum.WAIT_PARSER_RESULT_FAIL);
        }

        //Integrate data
        return resultList.stream().flatMap(Collection::stream).collect(Collectors.toList());
    }

Summary

The focus here is to promote the CompletableFuture asynchronous orchestration solution, but there is a problem with the code snippet just above: CompletableFuture> listCompletableFuture = allTaskFeatureList.thenApply(v -> completableFutureList.stream().map(CompletableFuture::join). collect(Collectors.toList()));
What should I do if an exception occurs during the join in this place? Secondly, what are the benefits of allof? Will it block the main thread? Why not just get the task results one by one?
Take a look at the following example:
How to use get():
try {
resultList = listCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
log.error(“Failed to process task result data, {}”, ExceptionUtils.getStackTrace(e));
throw new CustomException(TaskProcessStatusEnum.WAIT_PARSER_RESULT_FAIL);
}
Waiting: The get() method is a blocking method that waits for listCompletableFuture to complete and obtains its result. This causes the current thread to be blocked until all asynchronous tasks are completed.
Exception handling: If any asynchronous task throws an exception, it will be caught here and a custom exception will be thrown through CustomException, indicating that the task processing status has failed.
Way without get():
return listCompletableFuture.thenApply(resultList ->
resultList.stream().flatMap(Collection::stream).collect(Collectors.toList())
).exceptionally(e -> {
log.error(“Failed to process task result data, {}”, ExceptionUtils.getStackTrace(e.getCause()));
throw new CustomException(TaskProcessStatusEnum.WAIT_PARSER_RESULT_FAIL);
}).join();
Waiting: The get() method is not used explicitly here, but the join() method is used to wait for the result of listCompletableFuture. However, the join() method behaves like get() and waits for all tasks to complete. The difference is that join() does not throw checked exceptions, but wraps them in CompletionException.
Exception handling: The exception handling part uses the exceptionally method. If an exception occurs, it will be handled here. Likewise, exceptions are logged and a custom exception CustomException is thrown.
Conclusion:
Using the get() method will explicitly wait for the completion of the asynchronous task, while the method without using get() is more asynchronous. The completion of the task can be performed in a background thread without blocking the current thread.
In terms of exception handling, the logic of the two is basically the same, both recording exception information and throwing custom exceptions. It’s just that in the non-get() way, exceptions are handled through exceptionally rather than through try-catch.
Using the get() method may be more suitable for some synchronous scenarios where results need to be obtained immediately, while not using get() is more in line with the asynchronous programming style and can better utilize concurrency in the system.

If an exception occurs during the task during join, why is it caught during get? Are they using an exception encapsulation object?

In CompletableFuture, the join() method is used to wait for the underlying task to complete and get the result. If an exception occurs during task execution, the join() method will encapsulate the exception in CompletionException. This is because the method in CompletableFuture is not declared to throw a checked exception, and CompletionException is a runtime exception, so caller can use try-catch block to Catch exceptions.
In your code, you use the join() method to wait for the completion of all tasks. Therefore, if an exception occurs in a task during task execution, the exception will be encapsulated in CompletableFuture. At this time, the CompletableFuture instance allTaskFeatureList returned byCompletableFuture.allOf() will contain the status of all task executions, including success and failure. This status information is encapsulated inside the join() method.
Then, you use allTaskFeatureList.thenApply(v -> completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList())) where thenApply is used to apply a function after allTaskFeatureList is completed . Here, we use the join() method to get the results of individual tasks, including possible exceptions. At this time, if an exception occurs during the execution of any task, the join() method will encapsulate the exception in CompletionException.
Next, you use the get() method to get the result of listCompletableFuture, which is the result list of all tasks. Since an exception may occur in join(), this exception will be caught in the get() method and processed in the try-catch block. This way, you can check the execution status of the task, including success and failure.
In short, the exception inside join() is passed through the exception mechanism of CompletableFuture, and the get() method is used to catch this exception. In your code, get() captures the exception in the CompletableFuture instance of listCompletableFuture, which contains the execution status of all tasks. Hope the explanation is clearer this time
In fact, to put it bluntly, the exception of join will be encapsulated with a CompletionException object and then thrown upward. Whoever calls it needs to catch it. This is because you need to try resultList = listCompletableFuture.get(). . . catch
If you don’t use get() in this place, such as join, then you don’t need try in this place. . If the exception is caught, the exception will continue to be thrown up, and the method that calls the asyncHandleTask() method will try. . catch, if you don’t catch it, it will be swallowed by the jvm, so sometimes you add an annotation @SneakyThrows in order not to process it.

Extended scenario: For example, there are a total of 8 tasks to implement asynchronous operations of their respective businesses, but if they fail, logs and results will be printed out, and if they succeed, they will continue to perform other business operations

private List<Object> asyncHandleTask(Map<String, TaskHandlerService> handleServiceMap, Map<String, TaskResultParserDTO> taskResultParserMap) {
    //Submit task
    List<CompletableFuture<List<Object>>> completableFutureList = handleServiceMap.entrySet().stream()
            .map(entry -> CompletableFuture.supplyAsync(() -> entry.getValue().handle(taskResultParserMap.get(entry.getKey())), threadPoolTaskExecutor)
                    .exceptionally(ex -> {
                        // Handle exceptions during task execution
                        log.error("Task failed: {}", ExceptionUtils.getStackTrace(ex));
                        return Collections.emptyList(); // Return a default value or perform other operations, depending on your needs
                    }))
            .collect(Collectors.toList());

    // Wait for all tasks to complete
    CompletableFuture<Void> allTaskFeatureList = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));

    // Get the result data of all tasks
    CompletableFuture<List<List<Object>>> listCompletableFuture = allTaskFeatureList.thenApply(v -> completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    List<List<Object>> resultList;

    try {
        resultList = listCompletableFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        log.error("Failed to process task result data, {}", ExceptionUtils.getStackTrace(e));
        throw new CustomException(TaskProcessStatusEnum.WAIT_PARSER_RESULT_FAIL);
    }
}
    //Use isCompletedExceptionally to determine whether an exception has occurred
    // Handle successful and failed tasks
    List<Object> finalResultList = new ArrayList<>();
    for (int i = 0; i < completableFutureList.size(); i + + ) {
        CompletableFuture<List<Object>> future = completableFutureList.get(i);
        if (future.isCompletedExceptionally()) {
            // Handle failed tasks, record logs, etc.
            log.error("Task failed for entry: {}", handleServiceMap.entrySet().toArray()[i]);
        } else {
            // Process successful tasks and continue to perform other business operations
            finalResultList.addAll(resultList.get(i));
        }
    }
    return finalResultList;
}