Concurrent programming – Future & CompletableFuture

Article directory

  • Introduction to Future
  • FutureTask use
  • FutureTask Analysis
  • CompletableFuture
    • Application of CompletableFuture
    • CompletableFuture example
  • Summarize

Introduction to Future

The commonly used ways to create threads in Java are Thread and Runnable. If you need the currently processed task to return results, you need to use Callable. Callable needs to cooperate with Future to run.

Future is an interface, and the FutureTask implementation class is generally used to receive the return results of the Callable task.

FutureTask usage

The following example uses FutureTask to perform an asynchronous task that returns results. Callable is the task to be executed, and FutureTask is the location where the results returned by the task are stored.

public static void main(String[] args) throws ExecutionException, InterruptedException {<!-- -->
    FutureTask<Integer> futureTask = new FutureTask<>(() -> {<!-- -->
        System.out.println("Task Execution");
        Thread.sleep(2000);
        return 123 + 764;
    });

    Thread t = new Thread(futureTask);
    t.start();

    System.out.println("main thread started t thread processing task");
    Integer result = futureTask.get();
    System.out.println(result);
}

FutureTask Analysis

First, let’s take a look at the core properties of FutureTask

/**
 * NEW -> COMPLETING -> NORMAL The task is executed normally and the returned result is a normal result.
 * NEW -> COMPLETING -> EXCEPTIONAL The task is executed normally, but the return result is abnormal
 * NEW -> CANCELLED The process in which the task is directly canceled
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
//Represents the status of the current task
private volatile int state;
private static final int NEW = 0; // Initialization status of the task
private static final int COMPLETING = 1; // Callable results (normal results, abnormal results) are being encapsulated to the current FutureTask
private static final int NORMAL = 2; //NORMAL task ends normally
private static final int EXCEPTIONAL = 3; // An exception occurred while executing the task
private static final int CANCELLED = 4; // The task was canceled.
private static final int INTERRUPTING = 5; // The interrupt status of the thread is set to true (it is still running now)
private static final int INTERRUPTED = 6; // The thread was interrupted.

//The current task to be executed
private Callable<V> callable;
// Store the attribute of the result returned by the task, which is the result that futureTask.get needs to obtain.
private Object outcome;
//The thread that executes the task.
private volatile Thread runner;
// One-way linked list, storing threads suspended and waiting through the get method
private volatile WaitNode waiters;

After t.start, the call method of Callable is executed through the run method. This method is synchronous, and then the return result is assigned to outcome.

//The execution process of the run method will eventually execute the call method of Callable
public void run() {<!-- -->
    // Ensure that the status of the task is NEW before it can be run.
    // Based on CAS, set the current thread as runner.
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
        return;
    // Prepare to execute the task
    try {<!-- -->
        // To execute task c
        Callable<V> c = callable;
        // The task is not null, and the status of the task is still NEW
        if (c != null & amp; & amp; state == NEW) {<!-- -->
            //Put the returned result
            V result;
            // Whether the task execution ends normally
            boolean ran;
            try {<!-- -->
                // Run the call method, get the return result and encapsulate it in result.
                result = c.call();
                //Return normally, set ran to true
                ran = true;
            } catch (Throwable ex) {<!-- -->
                //The result is null
                result = null;
                //Exception returns, ran is set to false
                ran = false;
                //Set exception information
                setException(ex);
            }
            if (ran)
                // Normal execution ends, set the return result
                set(result);
        }
    } finally {<!-- -->
        // Set the runner that executes the task to empty
        runner = null;
        // Get the status
        int s = state;
        // The interruption needs some follow-up processing
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


//Set the return result
protected void set(V v) {<!-- -->
    // First set the task status from NEW to COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {<!-- -->
        // Set the return result to outcome.
        outcome = v;
        // Change the status to NORMAL, which represents normal technology
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        finishCompletion();
    }
}

When the get method obtains the return result, it will check the current thread status. If the status has not been reached, that is, the call method has not been executed and the set method has not been executed, the thread will be suspended and blocked LockSupport.park(this);.

public V get() throws InterruptedException, ExecutionException {<!-- -->
    // get status
    int s = state;
    // If you are satisfied with finding a state, it means that the result may not be returned yet.
    if (s <= COMPLETING)
        //Try to suspend the thread and wait for the result
        s = awaitDone(false, 0L);
    return report(s);
}

// The thread needs to wait for the task execution to end, and the waiting task execution status becomes greater than the COMPLETING status.
private int awaitDone(boolean timed, long nanos) throws InterruptedException {<!-- -->
    // Calculate the deadline. If it is get(), it is 0. If it is get(time,unit), then add the current system time.
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // Build WaitNode
    WaitNode q = null;
    // queued = false
    boolean queued = false;
    // infinite loop
    for (;;) {<!-- -->
        // Find out whether the get thread is interrupted.
        if (Thread.interrupted()) {<!-- -->
            // Remove the current node from waiters.
            removeWaiter(q);
            // and throw an interrupt exception
            throw new InterruptedException();
        }

        // Get the status of the current task
        int s = state;
        // Determine whether the task has ended
        if (s > COMPLETING) {<!-- -->
            // If WaitNode has been set, directly remove the WaitNode thread
            if (q != null)
                q.thread = null;
            // Return the status of the current task
            return s;
        }
        // If the task's status is COMPLETING,
        else if (s == COMPLETING)
            // The duration of COMPLETING is very short and only requires a ready-made concession.
            Thread.yield();

        //The current status of the thread is NEW (the call method may not have finished executing, and the thread is ready to be suspended)
        else if (q == null)
            //Encapsulate WaitNode to store the current thread
            q = new WaitNode();
        else if (!queued)
            // If WaitNode is not yet queued in waiters, queue it in now (the effect of head insertion method)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        else if (timed) {<!-- -->
            //get(time,unit) method of suspending threads
            // Calculate hang time
            nanos = deadline - System.nanoTime();
            // The hanging time, whether it is less than or equal to 0
            if (nanos <= 0L) {<!-- -->
                // Remove the current Node in waiters
                removeWaiter(q);
                //Return task status
                return state;
            }
            // Just specify the hang time normally. (thread hangs)
            LockSupport.parkNanos(this, nanos);
        }
        else {<!-- -->
            // How get() suspends threads
            LockSupport.park(this);
        }
    }
}

When the task is executed (the set method is executed), the thread is awakened by finishCompletion, LockSupport.unpark(t);

//The task status has changed to NORMAL, do some follow-up processing
private void finishCompletion() {<!-- -->
    for (WaitNode q; (q = waiters) != null;) {<!-- -->
        // After getting the first node, directly use CAS to set it to null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {<!-- -->
            for (;;) {<!-- -->
                // Get thread information based on q
                Thread t = q.thread;
                // Thread is not null
                if (t != null) {<!-- -->
                    //Set WaitNode's thread to null
                    q.thread = null;
                    // wake up this thread
                    LockSupport.unpark(t);
                }
                //Traverse backward and then wake up
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                // Point to the WaitNode of next
                q = next;
            }
            break;
        }
    }

    //Extension method, no implementation, you can implement it yourself
    done();

    // The task is completed, you can say goodbye!
    callable = null;
}

Processing of getting returned results

//The task ends.
private V report(int s) throws ExecutionException {<!-- -->
    // get the result
    Object x = outcome;
    // Determine whether the normal return ends
    if (s == NORMAL)
        //return results
        return (V)x;
    //Task status is greater than canceled
    if (s >= CANCELLED)
        // Throw exception.
        throw new CancellationException();
    // Throw exception.
    throw new ExecutionException((Throwable)x);
}

//Return report normally
//Exception return report
//Cancel task report
//Interrupt task awaitDone

CompletableFuture

Problems with FutureTask:
Question 1: Before FutureTask obtains the result of thread execution, the main thread needs to block through the get method and wait for the child thread to finish executing the call method before it can get the return result.
Question 2: If you do not suspend the thread through get, you can use a while loop to constantly determine whether the execution status of the task is over, and then get the result after it is over. If the task is not executed for a long time, the CPU will keep scheduling methods to check the task status, which will waste CPU resources.

FutureTask is a synchronous and non-blocking way to process tasks. An asynchronous, non-blocking way of processing tasks is needed. CompletableFuture provides various asynchronous and non-blocking processing solutions to a certain extent.

CompletableFuture also implements the functions implemented by the Future interface. You can use CompletableFuture directly without using FutureTask. It provides a very rich function to perform various asynchronous operations.

Application of CompletableFuture

The most important thing about CompletableFuture is that it solves the problem of asynchronous callback

CompletableFuture is to execute an asynchronous task. The asynchronous task can return a result or not. It uses the three most core interfaces in functional programming.

Supplier - Producer, no input parameters, but returns results
Consumer - Consumer, has input parameters, but does not return results
Function - a function that has input parameters and returns a result

Provides two basic methods for basic operation

supplyAsync(Supplier<U> supplier) executes tasks asynchronously and returns results
runAsync(Runnable runnable) executes tasks asynchronously and returns no results

Without specifying a thread pool, these two asynchronous tasks are handed over to ForkJoinPool for execution.

But using only these two methods, asynchronous callback cannot be achieved. If you need to continue to perform subsequent task operations with the returned results after the current task is executed, you need to implement it based on other methods.

thenApply(Function<prevResult,currResult>); Wait for the previous task to finish processing, take the return result of the previous task, process it again, and return the current result
thenApplyAsync(Function<prevResult,currResult>, thread pool) uses a new thread execution
thenAccept(Consumer<preResult>); Wait for the previous task to finish processing, and then process the return result of the previous task. No result is returned.
thenAcceptAsync(Consumer<preResult>, thread pool); uses a new thread for execution
thenRun(Runnable) waits for the previous task to finish processing before processing. Does not receive the results of predecessor tasks and does not return results.
thenRunAsync(Runnable[,thread pool]) uses a new thread execution

Secondly, it is possible to perform relatively complex processing, executing subsequent tasks while the previous task is being executed. Wait for the pre- and post-tasks to be completed before executing the final task

thenCombine(CompletionStage,Function<prevResult,nextResult,afterResult>) Let prevResult and nextResult be executed together. After the execution is completed, the results of the first two tasks are obtained and the final processing is performed. The final processing can also return the results.
thenCombineAsync(CompletionStage,Function<prevResult,nextResult,afterResult>[, thread pool]) uses a new thread to execute
thenAcceptBoth(CompletionStage,Consumer<prevResult,nextResult>); Let the pre-task and the subsequent task be executed at the same time. After both tasks are executed, the results of the two tasks are obtained and subsequent processing is performed, but no result is returned.
thenAcceptBothAsync(CompletionStage,Consumer<prevResult,nextResult>[, thread pool]) uses a new thread to execute
runAfterBoth(CompletionStage,Runnble) allows the pre-task and the subsequent task to be executed at the same time, and then perform subsequent processing after both are executed.
runAfterBothAsync(CompletionStage,Runnble[, thread pool]) uses a new thread to execute

It also provides the possibility to execute two tasks at the same time, but when one task ends and the result is returned, the final processing is done.

applyToEither(CompletionStage,Function<firstResult,afterResult>) The first two tasks are executed at the same time. After one task is executed, the returned result is obtained, final processing is performed, and the result is returned.
acceptEither(CompletionStage,Consumer<firstResult>) The first two tasks are executed at the same time. One task is executed, the return result is obtained, and final processing is performed. No result is returned.
runAfterEither(CompletionStage,Runnable) The first two tasks are executed at the same time. One task is executed and final processing is done.

It also provides the option to wait until the pre-processing task is processed, and then do subsequent processing. The result returned by the subsequent processing is CompletionStage.

thenCompose(Function<prevResult,CompletionStage>)

Finally, there are various postures for handling exceptions

exceptionally(Function<Throwable,currResult>)
whenComplete(Consumer<prevResult,Throwable>)
hanle(Function<prevResult,Throwable,currResult>)

CompletableFuture example

public static void main(String[] args) throws InterruptedException {<!-- -->
    sout("I'll go home for dinner");

    CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {<!-- -->
        sout("Auntie cooks!");
        return "Pork in pot!";
    });
    sout("I watch TV!");

    sout("I eat:" + task.join());
}
public static void main(String[] args) throws InterruptedException {<!-- -->
    sout("I'll go home for dinner");

    CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {<!-- -->
        sout("Auntie is cooking!");
        return "Pork in pot!";
    },executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> {<!-- -->
        sout("Xiao Wang's stewed rice");
        return "rice!";
    },executor),(food,rice) -> {<!-- -->
        sout("Dawangduan" + food + "," + rice);
        return "The food is ready!";
    },executor);

    sout("I watch TV!");
    sout("I eat:" + task.join());
}

Summary

Future and CompletableFuture are both interfaces and classes used to handle asynchronous tasks. Their main differences lie in functional complexity and usage scenarios. Future is relatively simple and is mainly used for simple asynchronous task processing, while CompletableFuture is more flexible and powerful and suitable for complex asynchronous task processing scenarios.