CompletableFuture’s cancel and a small pit of handleAsync

Environment

  • Ubuntu 22.04
  • Java 17.0.3.1
  • IntelliJ IDEA 2022.1.3

Background

I recently encountered a small pit in CompletableFuture, record it, and review CompletableFuture by the way.

Simply put, after calling the cancel() method of CompletableFuture, it seems that the callback method specified by handleAsync() is not triggered. After some testing and research, it turned out to be my own problem.

Test 1

First, I wrote the simplest test program: run a task runTask() through CompletableFuture’s supplyAsync() method, and pass CompletableFuture’s handleAsync() The code> method adds a callback function handleResult() to handle the running result of the task. code show as below:

public class Test0524 {<!-- -->
    private String runTask() {<!-- -->
        System.out.println(Thread.currentThread().getName() + ": runTask start " + System.currentTimeMillis());

        try {<!-- -->
            Thread. sleep(3000);
        } catch (InterruptedException e) {<!-- -->
            System.out.println("runTask interrupted! " + System.currentTimeMillis());
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + ": runTask end " + System.currentTimeMillis());

        return "task result: success";
    }

    public void test() {<!-- -->
        System.out.println(Thread.currentThread().getName() + ": test start " + System.currentTimeMillis());

        CompletableFuture<String> future = CompletableFuture. supplyAsync(() -> runTask());

        future. handleAsync((result, throwable) -> handleResult(result, throwable));

        try {<!-- -->
            Thread. sleep(4000);
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + ": test end " + System.currentTimeMillis());
    }

    private String handleResult(String result, Throwable throwable) {<!-- -->
        System.out.println(Thread.currentThread().getName() + ": handleResult start " + System.currentTimeMillis());

        System.out.println(Thread.currentThread().getName() + ": result = " + result + ", throwable = " + throwable);

        System.out.println(Thread.currentThread().getName() + ": handleResult end " + System.currentTimeMillis());

        return "handle result: success";
    }

    public static void main(String[] args) {<!-- -->
        Test0524 obj = new Test0524();

        obj. test();
    }
}

Note: Sleep for 3 seconds in the task to simulate that the task needs to do something. At the same time, sleep for 4 seconds in the main thread to ensure that the main thread ends after the task. If the main thread ends earlier than the task, the entire program ends with the main thread.

Run the program, the result is as follows:

main: test start 1684930792470
ForkJoinPool.commonPool-worker-1: runTask start 1684930792495
ForkJoinPool.commonPool-worker-1: runTask end 1684930795496
ForkJoinPool.commonPool-worker-2: handleResult start 1684930795498
ForkJoinPool.commonPool-worker-2: result = task result: success, throwable = null
ForkJoinPool.commonPool-worker-2: handleResult end 1684930795510
main: test end 1684930796496

It can be seen that the main thread, task, and result handler are three different threads, because supplyAsync() and handleAsync() are used. As the name suggests, they all end with “Async”, which means an asynchronous thread.

There were no problems with this test and the program worked fine.

Test 2

During the running of the task, cancel the task.

Modify the test() method as follows:

 public void test() {<!-- -->
        System.out.println(Thread.currentThread().getName() + ": test start " + System.currentTimeMillis());

        CompletableFuture<String> future;
        future = CompletableFuture. supplyAsync(() -> runTask());

        future. handleAsync((result, throwable) -> handleResult(result, throwable));

        try {<!-- -->
            Thread. sleep(2000);
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + ": cancel start " + System.currentTimeMillis());
        boolean cancel = future. cancel(true);
        System.out.println(Thread.currentThread().getName() + ": cancel end " + System.currentTimeMillis() + ", result = " + cancel);

        try {<!-- -->
            Thread. sleep(2000);
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + ": test end " + System.currentTimeMillis());
    }

Note: Because the task continues to run for 3 seconds, sleep for 2 seconds in the main thread to ensure that the task runs, then cancel the task, and finally sleep for 2 seconds to ensure that the task has enough time to run to the end.

Run the program, the result is as follows:

main: test start 1684931916879
ForkJoinPool.commonPool-worker-1: runTask start 1684931916899
main: cancel start 1684931918901
ForkJoinPool.commonPool-worker-2: handleResult start 1684931918902
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CancellationException
ForkJoinPool.commonPool-worker-2: handleResult end 1684931918914
main: cancel end 1684931918902, result = true
ForkJoinPool.commonPool-worker-1: runTask end 1684931919900
main: test end 1684931920917

It can be seen from the results that the cancel() operation immediately triggers the handleAsync() callback method, and the difference between the two is only 1 millisecond.

However, we can see that the task is not actually affected, it is still running. Although a true value is passed to the parameter of the cancel() method (indicating mayInterruptIfRunning), no interrupt signal is sent to the task (if there is an interrupt signal, it will be interrupted by the task’s sleep method capture).

The JDK documentation states this:

mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.

Therefore, the actual behavior is: when the cancel() method is called on the running task, the task running itself will not be affected, but its handle() will be triggered immediately / The callback method specified by handleAsync(). Of course, after the task runs, the callback method will no longer be triggered.

The callback method of handle() / handleAsync() has two parameters: one is the running result of the task, and the other is the error (Throwable). Under normal circumstances, error is null, and under abnormal circumstances, the operation result is null.

It can be seen from the running results that after the cancel task, Throwable is CancellationException , and the running result is null.

Test 3

An exception occurred during task execution.

Modify the runTask() method as follows:

 private String runTask() {<!-- -->
        System.out.println(Thread.currentThread().getName() + ": runTask start " + System.currentTimeMillis());

        try {<!-- -->
            Thread. sleep(1000);
        } catch (InterruptedException e) {<!-- -->
            System.out.println("runTask interrupted! " + System.currentTimeMillis());
            e.printStackTrace();
        }

        throw new RuntimeException("runTask exception!");
    }

Note: The sleep time of the task is changed to 1 second, which means that an exception occurs before the task cancel.

Run the program, the result is as follows:

main: test start 1684933211432
ForkJoinPool.commonPool-worker-1: runTask start 1684933211450
ForkJoinPool.commonPool-worker-2: handleResult start 1684933212454
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CompletionException: java.lang.RuntimeException: runTask exception!
ForkJoinPool.commonPool-worker-2: handleResult end 1684933212487
main: cancel start 1684933213452
main: cancel end 1684933213453, result = false
main: test end 1684933215484

visible:

  1. After the task throws an exception, the callback method of handleAsync() is triggered immediately, the task ends abnormally, the result is null, Throwable is CompletionException (caused by RuntimeException ).
  2. The task has ended before the cancel, so the cancel task is invalid, and the return value of the cancel() method is false.

Test 4

Same as test 3, the only difference is that it is canceled first, and then the exception occurs during the task running.

Change the sleep time in the runTask() method to 3 seconds.

Run the program, the result is as follows:

main: test start 1684933790852
ForkJoinPool.commonPool-worker-1: runTask start 1684933790878
main: cancel start 1684933792880
ForkJoinPool.commonPool-worker-2: handleResult start 1684933792884
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CancellationException
main: cancel end 1684933792883, result = true
ForkJoinPool.commonPool-worker-2: handleResult end 1684933792904
main: test end 1684933794904

It can be seen that the cancel operation immediately triggers the callback method of handleAsync(). Of course, the task is actually still running, and an exception is thrown, but it is not caught and processed.

Test 5

The previous tests have been run many times, and the results are all expected, no problem.

Then why in my code, I feel that after the cancel task, the callback method of handleAsync() is not triggered. Because the system is complex and inconvenient to debug, my judgment is based on looking at the log. There is some logic to record the log in the callback method, but when I check the log, I only see the log of cancel, but not the log of the callback method, so I It feels like the callback method is not being triggered.

After several days of testing and analysis, I never found the reason, and even began to suspect that it was a JDK problem. I finally found out today that it was my own problem. Oops……

Directly say the conclusion, because handleAsync() starts an asynchronous thread, so if an exception occurs in the callback method, it is not caught.

To put it bluntly, in the callback method, an exception occurred and was not caught, so the thread hung up all of a sudden.

So what’s going wrong? Why is there no exception in the callback method when it ends normally? I am ashamed to say that the result of the task is null because of the cancel task, and I used something like result.getXxx() in the callback method code, so an exception occurs, and the result of the task is not null when it ends normally, so there is no exception.

Add a line of code at the top of the handleResult() method:

 System.out.println(result.length());

Run the program, the result is as follows:

main: test start 1684934679760
ForkJoinPool.commonPool-worker-1: runTask start 1684934679788
main: cancel start 1684934681789
main: cancel end 1684934681792, result = true
main: test end 1684934683830

Look, do you feel that the cancel operation does not trigger the callback method of handleAsync()?

Of course, this example is a very simplified model, which looks clear at a glance, but in the actual project, the situation is a hundred times more complicated, and it took me several days to finally find out the reason.

Test 6

Catch the exception in the callback method.

Find the cause of the problem, and then it’s easy to handle, just catch the exception in the callback method. Modify the handleResult() method as follows:

 private String handleResult(String result, Throwable throwable) {<!-- -->
        try {<!-- -->
            System.out.println(result.length());

            System.out.println(Thread.currentThread().getName() + ": handleResult start " + System.currentTimeMillis());

            System.out.println(Thread.currentThread().getName() + ": result = " + result + ", throwable = " + throwable);

            throwable. printStackTrace();

            System.out.println(Thread.currentThread().getName() + ": handleResult end " + System.currentTimeMillis());

            return "handle result: success";
        } catch (Exception e) {<!-- -->
            System.out.println("handleResult exception!");
            e.printStackTrace();
            throw e;
        }
    }

Run the program, the result is as follows:

main: test start 1684935436681
ForkJoinPool.commonPool-worker-1: runTask start 1684935436699
main: cancel start 1684935438700
handleResult exception!
java.lang.NullPointerException: Cannot invoke "String. length()" because "result" is null
at com.example.test0524.Test0524.handleResult(Test0524.java:51)
at com.example.test0524.Test0524.lambda$test$1(Test0524.java:28)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
main: cancel end 1684935438703, result = true
main: test end 1684935440732

OK, everything works fine now.

Test 7

Thinking: What if we replace handleAsync() with handle() , that is, use synchronous threads?

Modify the test() method and replace handleAsync with handle .

Modify the handleResult() method, remove the try…catch block, and add: System.out.println(Thread.currentThread().getName() + "===== ================");

Run the program, the result is as follows:

main: test start 1684936772747
ForkJoinPool.commonPool-worker-1: runTask start 1684936772769
main: cancel start 1684936774770
main=====================
main: cancel end 1684936774776, result = true
main: test end 1684936776796

Conclusion: The callback method and the main thread use the same thread, but if the callback method has an exception, it will not be caught and will not affect the logic of the main thread. So you still have to catch the exception of the callback method.