Article directory
- Introduction to Future
- FutureTask use
- FutureTask Analysis
- CompletableFuture
-
- Application of CompletableFuture
- CompletableFuture example
- Summarize
Introduction to Future
The commonly used ways to create threads in Java are Thread and Runnable. If you need the currently processed task to return results, you need to use Callable. Callable needs to cooperate with Future to run.
Future is an interface, and the FutureTask implementation class is generally used to receive the return results of the Callable task.
FutureTask usage
The following example uses FutureTask to perform an asynchronous task that returns results. Callable is the task to be executed, and FutureTask is the location where the results returned by the task are stored.
public static void main(String[] args) throws ExecutionException, InterruptedException {<!-- --> FutureTask<Integer> futureTask = new FutureTask<>(() -> {<!-- --> System.out.println("Task Execution"); Thread.sleep(2000); return 123 + 764; }); Thread t = new Thread(futureTask); t.start(); System.out.println("main thread started t thread processing task"); Integer result = futureTask.get(); System.out.println(result); }
FutureTask Analysis
First, let’s take a look at the core properties of FutureTask
/** * NEW -> COMPLETING -> NORMAL The task is executed normally and the returned result is a normal result. * NEW -> COMPLETING -> EXCEPTIONAL The task is executed normally, but the return result is abnormal * NEW -> CANCELLED The process in which the task is directly canceled * NEW -> INTERRUPTING -> INTERRUPTED */ //Represents the status of the current task private volatile int state; private static final int NEW = 0; // Initialization status of the task private static final int COMPLETING = 1; // Callable results (normal results, abnormal results) are being encapsulated to the current FutureTask private static final int NORMAL = 2; //NORMAL task ends normally private static final int EXCEPTIONAL = 3; // An exception occurred while executing the task private static final int CANCELLED = 4; // The task was canceled. private static final int INTERRUPTING = 5; // The interrupt status of the thread is set to true (it is still running now) private static final int INTERRUPTED = 6; // The thread was interrupted. //The current task to be executed private Callable<V> callable; // Store the attribute of the result returned by the task, which is the result that futureTask.get needs to obtain. private Object outcome; //The thread that executes the task. private volatile Thread runner; // One-way linked list, storing threads suspended and waiting through the get method private volatile WaitNode waiters;
After t.start, the call method of Callable is executed through the run method. This method is synchronous, and then the return result is assigned to outcome.
//The execution process of the run method will eventually execute the call method of Callable public void run() {<!-- --> // Ensure that the status of the task is NEW before it can be run. // Based on CAS, set the current thread as runner. if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())) return; // Prepare to execute the task try {<!-- --> // To execute task c Callable<V> c = callable; // The task is not null, and the status of the task is still NEW if (c != null & amp; & amp; state == NEW) {<!-- --> //Put the returned result V result; // Whether the task execution ends normally boolean ran; try {<!-- --> // Run the call method, get the return result and encapsulate it in result. result = c.call(); //Return normally, set ran to true ran = true; } catch (Throwable ex) {<!-- --> //The result is null result = null; //Exception returns, ran is set to false ran = false; //Set exception information setException(ex); } if (ran) // Normal execution ends, set the return result set(result); } } finally {<!-- --> // Set the runner that executes the task to empty runner = null; // Get the status int s = state; // The interruption needs some follow-up processing if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } //Set the return result protected void set(V v) {<!-- --> // First set the task status from NEW to COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {<!-- --> // Set the return result to outcome. outcome = v; // Change the status to NORMAL, which represents normal technology UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } }
When the get method obtains the return result, it will check the current thread status. If the status has not been reached, that is, the call method has not been executed and the set method has not been executed, the thread will be suspended and blocked LockSupport.park(this);.
public V get() throws InterruptedException, ExecutionException {<!-- --> // get status int s = state; // If you are satisfied with finding a state, it means that the result may not be returned yet. if (s <= COMPLETING) //Try to suspend the thread and wait for the result s = awaitDone(false, 0L); return report(s); } // The thread needs to wait for the task execution to end, and the waiting task execution status becomes greater than the COMPLETING status. private int awaitDone(boolean timed, long nanos) throws InterruptedException {<!-- --> // Calculate the deadline. If it is get(), it is 0. If it is get(time,unit), then add the current system time. final long deadline = timed ? System.nanoTime() + nanos : 0L; // Build WaitNode WaitNode q = null; // queued = false boolean queued = false; // infinite loop for (;;) {<!-- --> // Find out whether the get thread is interrupted. if (Thread.interrupted()) {<!-- --> // Remove the current node from waiters. removeWaiter(q); // and throw an interrupt exception throw new InterruptedException(); } // Get the status of the current task int s = state; // Determine whether the task has ended if (s > COMPLETING) {<!-- --> // If WaitNode has been set, directly remove the WaitNode thread if (q != null) q.thread = null; // Return the status of the current task return s; } // If the task's status is COMPLETING, else if (s == COMPLETING) // The duration of COMPLETING is very short and only requires a ready-made concession. Thread.yield(); //The current status of the thread is NEW (the call method may not have finished executing, and the thread is ready to be suspended) else if (q == null) //Encapsulate WaitNode to store the current thread q = new WaitNode(); else if (!queued) // If WaitNode is not yet queued in waiters, queue it in now (the effect of head insertion method) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) {<!-- --> //get(time,unit) method of suspending threads // Calculate hang time nanos = deadline - System.nanoTime(); // The hanging time, whether it is less than or equal to 0 if (nanos <= 0L) {<!-- --> // Remove the current Node in waiters removeWaiter(q); //Return task status return state; } // Just specify the hang time normally. (thread hangs) LockSupport.parkNanos(this, nanos); } else {<!-- --> // How get() suspends threads LockSupport.park(this); } } }
When the task is executed (the set method is executed), the thread is awakened by finishCompletion, LockSupport.unpark(t);
//The task status has changed to NORMAL, do some follow-up processing private void finishCompletion() {<!-- --> for (WaitNode q; (q = waiters) != null;) {<!-- --> // After getting the first node, directly use CAS to set it to null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {<!-- --> for (;;) {<!-- --> // Get thread information based on q Thread t = q.thread; // Thread is not null if (t != null) {<!-- --> //Set WaitNode's thread to null q.thread = null; // wake up this thread LockSupport.unpark(t); } //Traverse backward and then wake up WaitNode next = q.next; if (next == null) break; q.next = null; // Point to the WaitNode of next q = next; } break; } } //Extension method, no implementation, you can implement it yourself done(); // The task is completed, you can say goodbye! callable = null; }
Processing of getting returned results
//The task ends. private V report(int s) throws ExecutionException {<!-- --> // get the result Object x = outcome; // Determine whether the normal return ends if (s == NORMAL) //return results return (V)x; //Task status is greater than canceled if (s >= CANCELLED) // Throw exception. throw new CancellationException(); // Throw exception. throw new ExecutionException((Throwable)x); } //Return report normally //Exception return report //Cancel task report //Interrupt task awaitDone
CompletableFuture
Problems with FutureTask:
Question 1: Before FutureTask obtains the result of thread execution, the main thread needs to block through the get method and wait for the child thread to finish executing the call method before it can get the return result.
Question 2: If you do not suspend the thread through get, you can use a while loop to constantly determine whether the execution status of the task is over, and then get the result after it is over. If the task is not executed for a long time, the CPU will keep scheduling methods to check the task status, which will waste CPU resources.
FutureTask is a synchronous and non-blocking way to process tasks. An asynchronous, non-blocking way of processing tasks is needed. CompletableFuture provides various asynchronous and non-blocking processing solutions to a certain extent.
CompletableFuture also implements the functions implemented by the Future interface. You can use CompletableFuture directly without using FutureTask. It provides a very rich function to perform various asynchronous operations.
Application of CompletableFuture
The most important thing about CompletableFuture is that it solves the problem of asynchronous callback
CompletableFuture is to execute an asynchronous task. The asynchronous task can return a result or not. It uses the three most core interfaces in functional programming.
Supplier - Producer, no input parameters, but returns results Consumer - Consumer, has input parameters, but does not return results Function - a function that has input parameters and returns a result
Provides two basic methods for basic operation
supplyAsync(Supplier<U> supplier) executes tasks asynchronously and returns results runAsync(Runnable runnable) executes tasks asynchronously and returns no results
Without specifying a thread pool, these two asynchronous tasks are handed over to ForkJoinPool for execution.
But using only these two methods, asynchronous callback cannot be achieved. If you need to continue to perform subsequent task operations with the returned results after the current task is executed, you need to implement it based on other methods.
thenApply(Function<prevResult,currResult>); Wait for the previous task to finish processing, take the return result of the previous task, process it again, and return the current result thenApplyAsync(Function<prevResult,currResult>, thread pool) uses a new thread execution thenAccept(Consumer<preResult>); Wait for the previous task to finish processing, and then process the return result of the previous task. No result is returned. thenAcceptAsync(Consumer<preResult>, thread pool); uses a new thread for execution thenRun(Runnable) waits for the previous task to finish processing before processing. Does not receive the results of predecessor tasks and does not return results. thenRunAsync(Runnable[,thread pool]) uses a new thread execution
Secondly, it is possible to perform relatively complex processing, executing subsequent tasks while the previous task is being executed. Wait for the pre- and post-tasks to be completed before executing the final task
thenCombine(CompletionStage,Function<prevResult,nextResult,afterResult>) Let prevResult and nextResult be executed together. After the execution is completed, the results of the first two tasks are obtained and the final processing is performed. The final processing can also return the results. thenCombineAsync(CompletionStage,Function<prevResult,nextResult,afterResult>[, thread pool]) uses a new thread to execute thenAcceptBoth(CompletionStage,Consumer<prevResult,nextResult>); Let the pre-task and the subsequent task be executed at the same time. After both tasks are executed, the results of the two tasks are obtained and subsequent processing is performed, but no result is returned. thenAcceptBothAsync(CompletionStage,Consumer<prevResult,nextResult>[, thread pool]) uses a new thread to execute runAfterBoth(CompletionStage,Runnble) allows the pre-task and the subsequent task to be executed at the same time, and then perform subsequent processing after both are executed. runAfterBothAsync(CompletionStage,Runnble[, thread pool]) uses a new thread to execute
It also provides the possibility to execute two tasks at the same time, but when one task ends and the result is returned, the final processing is done.
applyToEither(CompletionStage,Function<firstResult,afterResult>) The first two tasks are executed at the same time. After one task is executed, the returned result is obtained, final processing is performed, and the result is returned. acceptEither(CompletionStage,Consumer<firstResult>) The first two tasks are executed at the same time. One task is executed, the return result is obtained, and final processing is performed. No result is returned. runAfterEither(CompletionStage,Runnable) The first two tasks are executed at the same time. One task is executed and final processing is done.
It also provides the option to wait until the pre-processing task is processed, and then do subsequent processing. The result returned by the subsequent processing is CompletionStage.
thenCompose(Function<prevResult,CompletionStage>)
Finally, there are various postures for handling exceptions
exceptionally(Function<Throwable,currResult>) whenComplete(Consumer<prevResult,Throwable>) hanle(Function<prevResult,Throwable,currResult>)
CompletableFuture example
public static void main(String[] args) throws InterruptedException {<!-- --> sout("I'll go home for dinner"); CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {<!-- --> sout("Auntie cooks!"); return "Pork in pot!"; }); sout("I watch TV!"); sout("I eat:" + task.join()); }
public static void main(String[] args) throws InterruptedException {<!-- --> sout("I'll go home for dinner"); CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {<!-- --> sout("Auntie is cooking!"); return "Pork in pot!"; },executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> {<!-- --> sout("Xiao Wang's stewed rice"); return "rice!"; },executor),(food,rice) -> {<!-- --> sout("Dawangduan" + food + "," + rice); return "The food is ready!"; },executor); sout("I watch TV!"); sout("I eat:" + task.join()); }
Summary
Future and CompletableFuture are both interfaces and classes used to handle asynchronous tasks. Their main differences lie in functional complexity and usage scenarios. Future is relatively simple and is mainly used for simple asynchronous task processing, while CompletableFuture is more flexible and powerful and suitable for complex asynchronous task processing scenarios.