10 | Flow, is it the ultimate choice for asynchronous programming?

Reactive programming used to be a very hot topic. It is a pattern of code control. Without analyzing other patterns, it is difficult to identify the good and bad aspects of reactive programming and the most appropriate use cases for it. Therefore, our discussion today is very different from the past.

In addition to reactive programming, we will also spend a lot of time discussing other programming models, both current and future. I hope this arrangement can help you choose the most appropriate mode according to specific scenarios.

Let’s start by reading the case, first take a look at the most traditional pattern, then transition to reactive programming step by step, and finally let’s talk a little bit about the coroutine pattern that has not yet been released in Java.

Read the case

I think you are like me, whether you are learning C language or Java language, you all start from the simple example of printing “Hello, world!” Let’s take a look at this familiar code again.

System.out.println("Hello, World!");

This code uses the most commonly used code control model: the imperative programming model. The so-called imperative programming model requires us to issue instructions through code, and then wait for the execution of the instructions and the state changes caused by the execution of the instructions. We also need to determine the next instruction to be issued based on the current status, and express the next instruction in code.

In the above code, the instruction we issued is: print the sentence “Hello, World!” to the standard output. Then, we wait for the execution results of the instructions to verify whether the code we wrote works according to our instructions.

Imperative programming model

The focus of the imperative programming model is on controlling the state. “Hello, world!” This example can give you some clues, but to understand state changes and control, we need to look at more than two lines of code.

try {<!-- -->
    Digest messageDigest = Digest.of("SHA-256");
    byte[] digestValue =
            messageDigest.digest("Hello, world!".getBytes());
} catch (NoSuchAlgorithmException ex) {<!-- -->
    System.out.println("Unsupported algorithm: SHA-256");
}

In the above code, we first call the Digest.of method to get a Digest instance; then we call the Digest.digest method of this instance to get a return value. After the execution of the first method is completed, the status after the execution of the first method is obtained, and the second method can continue to be executed.

This sequential execution mode has simple and direct logic. Simplicity and directness have great power in themselves, especially in achieving precise control. Therefore, this model occupies an overwhelming advantage in general programming language design and general application development.

However, this pattern requires state to be maintained and synchronized. If the number of states is large, we need to break the large code block into small code blocks; in this way, the code we write can be easier to read and easier to maintain. The bigger problem comes from the sequential execution required for state synchronization.

For example, in the above example, the implementation of the Digest.of method may be very efficient and execute quickly; while the implementation of the Digest.digest method may have an execution speed of milliseconds or even seconds. of. In an environment that requires low latency and high concurrency, waiting for the return result of the Digest.digest call may not be a good choice. In other words, blocking on method calls increases system latency and reduces the throughput that the system can support.

The delay consequences caused by this sequential execution model are intolerable in many scenarios in the Internet era (such as the Spring Festival train ticket pre-sale system, or the online shopping festival ordering system, etc.). One of the most typical scenarios where this problem exists is the traditional socket programming interface under the client-server architecture. It also raises the issue of C10K (supporting 10,000 concurrent users) that was raised about 20 years ago.

How to solve C10K problem? One main direction is to use non-blocking asynchronous programming.

Declarative programming model

Non-blocking asynchronous programming is not available through programming languages or standard class libraries. Supporting non-blocking asynchronous programming requires significant code changes and a change in thinking habits of code writing.

We can use the analogy of making a phone call.

Traditional imperative programming model, just like we usually make phone calls. We dial the other person’s phone number, wait for an answer, talk, and then hang up. When we hang up the phone, the process of calling is over, and we get the results we want.

Rather than blocking asynchronous programming, it’s more like leaving a phone message. We call the other person’s number, tell them to call back at their convenience, and then hang up. When we hung up the phone, the process of calling was of course over, but we didn’t get the results we wanted. The desired results can only be obtained by calling back.

The logic similar to callback is the key model of non-blocking asynchronous programming. Mapping to code is to use callback functions or methods.

When we try to use callback functions, our ideas and models for writing code change dramatically. Our focus will change from the “control state” of the imperative programming model to the “control goal”. At this time, our programming model has also changed to a declarative programming model.

**If the logic of the imperative programming model is to tell the computer “what to do”, then the logic of the declarative programming model is to tell the computer “what to do”. **The code of the imperative programming model is like an engineer working on an assembly line, tightening every screw in every detail; while the code of the declarative programming model is more like a military advisor sitting in the military tent, assigning tasks and strategizing .

Can the Digest we discussed earlier implement non-blocking asynchronous programming? The answer is yes, but we need to completely change the code, from API to implementation. The API declared in the code below is an example of our attempt to use declarative programming.

public sealed abstract class Digest {<!-- -->
    public static void of(String algorithm,
        Consumer<Digest> onSuccess, Consumer<Integer> onFailure) {<!-- -->
        // snipped
    }

    public abstract void digest(byte[] message,
        Consumer<byte[]> onSuccess, Consumer<Integer> onFailure);
}

The Digest.of method that has transformed the idea is like arranging a task: if the execution is successful, continue to execute plan A (that is, the onSuccess callback function); otherwise, continue to execute plan B (that is, the onFailure callback function). In fact, this is the concept of telling the computer “what to do” as we mentioned earlier.

With the design of callback functions, the implementation of the code is deregulated. Whether it is the implementation of the callback function or the call of the callback function, you can freely choose whether to use asynchronous mode or synchronous mode. Needless to say, this freedom is attractive. Starting from the introduction of new NIO features in JDK 7, this model began to enter Java’s industrial practice and achieved great success. A large number of star projects have emerged.

However, the design of callback functions also has inherent flaws. This flaw is callback hell (Callback Hell, often translated as callback hell. To express it more intuitively, I prefer to call it callback stacking). What does that mean? Usually, we need to arrange multiple small tasks to complete a large task. These small tasks may also be tasks with a causal relationship. In this case, the cooperation of the small tasks is required, or they are executed in order.

For example, in the Digest design above, we first need to determine whether the of method can succeed; if it succeeds, then use this Digest instance and call its Digest.digest method. When calling the Digest.digest method, Plan A and Plan B must also be made. In this way, the use of two callback functions will pile up. If the nesting of callback functions increases, the code will look like it is crowded together, which is not beautiful in form, difficult to read and difficult to maintain.

The following code is a use case of Digest that we designed using callback functions. In this use case, the nesting of callback functions is only two levels, and the form of the code has become difficult to read. You can try to write a 3- or 5-level nesting of callback functions to experience what deeply nested code looks like.

Digest.of("SHA-256",
    md -> {<!-- -->
        System.out.println("SHA-256 is not supported");
        md.digest("Hello, world!".getBytes(),
            values -> {<!-- -->
                System.out.println("SHA-256 is available");
            },
            errorCode -> {<!-- -->
                System.out.println("SHA-256 is not available");
            });
    },
    errorCode -> {<!-- -->
        System.out.println("Unsupported algorithm: SHA-256");
    });

If we can still overcome the formal accumulation caused by the callback function; then the logical accumulation brought about by this formal accumulation is almost unbearable for us. Logical accumulation means deep coupling of code. And deep coupling means that code maintenance is difficult. A little bit of code modification in deep nesting may be passed upward through the nesting layers, and finally affect the overall situation.

As a result, the declarative programming model using callback functions has serious scene adaptation problems. We usually only use callback functions to solve the modules that have the greatest impact on performance, such as network data transmission; most of the code still uses the traditional, sequential execution instruction model.

Fortunately, there are many efforts in the industry to try to improve the dilemma of using callback functions. The most outstanding and influential one is reactive programming.

Reactive programming

The basic logic of reactive programming is still to tell the computer “what to do”; but its focus has shifted to the changes in data and the transmission of data and changes, or in other words, to the response to data changes. So,the core of reactive programming is data flow and change delivery.

If we look at it from the perspective of data flow, data has two basic forms: data input and data output. From these two basic forms, three processes can be derived: initial source, data transfer and final outcome.

Data output

In the design of Java’s reactive programming model, the output of data is represented by Flow.Publisher with only one parameter.

@FunctionalInterface
public static interface Publisher<T> {<!-- -->
    public void subscribe(Subscriber<? super T> subscriber);
}

In the interface design of Flow.Publisher, the generic T represents the data type. The object of data output is represented by Flow.Subscriber. In other words, the publisher of data implements the transfer of data from the publisher to the subscriber by authorizing the subscriber. A data publisher can have multiple data subscribers.

It should be noted that the subscription interface is arranged in the Flow.Publisher interface. This also means that the subscriber’s subscription behavior is initiated by the publisher of the data, not by the subscriber.

The original source of data is a form of data output; it only has the transmission direction of data output and cannot receive data input.

For example, the following code is an example of the original source of data. In this code, the type of data is a byte array; for the implementation of data publishing, we use the reference implementation of the SubmissionPublisher class of the Java standard class library.

SubmissionPublisher<byte[]> publisher = new SubmissionPublisher<>();

Data entry

Next, let’s look at data input.

In the design of Java’s reactive programming model, data input is represented by Flow.Subscriber with only one parameter. That is the subscribers we mentioned earlier.

public static interface Subscriber<T> {<!-- -->
    public void onSubscribe(Subscription subscription);

    public void onNext(T item);

    public void onError(Throwable throwable);

    public void onComplete();
}

In the interface design of Flow.Subscriber, the generic T represents the data type. A total of four tasks are defined in this interface, and responses in the following four situations are specified:

  1. What should I do if I receive an invitation to subscribe? This behavior is determined by the implementation of the onSubscribe method.
  2. What to do if data is received? This behavior is determined by the implementation of the onNext method.
  3. What should I do if I encounter an error? This behavior is determined by the implementation of the onError method.
  4. What should I do if the data transfer is complete? This behavior is determined by the implementation of the onComplete method.

The final outcome of data is a form of data input; it only has one transmission direction of data input and cannot produce data output.

For example, the following code is an example of the final result of the data. In this code, we use a generic to represent the type of data; then, we use a Consumer function to represent how we should process the received data. This arrangement gives the example universal relevance. With just a few modifications, you can use it in actual scenarios.

package co.ivi.jus.flow.reactive;

import java.util.concurrent.Flow;
import java.util.function.Consumer;

public class Destination<T> implements Flow.Subscriber<T>{<!-- -->
    private Flow.Subscription subscription;
    private final Consumer<T> consumer;
    
    public Destination(Consumer<T> consumer) {<!-- -->
        this.consumer = consumer;
    }
    
    @Override
    public void onSubscribe(Flow.Subscription subscription) {<!-- -->
        this.subscription = subscription;
        subscription.request(1);
    }
    
    @Override
    public void onNext(T item) {<!-- -->
        subscription.request(1);
        consumer.accept(item);
    }
    
    @Override
    public void onError(Throwable throwable) {<!-- -->
        throwable.printStackTrace();
    }
    
    @Override
    public void onComplete() {<!-- -->
        System.out.println("Done");
    }
}

Data Control

You may have noticed that the Flow.Subscriber interface is not directly connected to Flow.Publisher. Instead, an intermediate proxy, Flow.Subscription, appears. Flow.Subscription manages and controls the connection between Flow.Publisher and Flow.Subscriber, as well as the transfer of data.

In other words, in Java’s reactive programming model, data transfer control is separated from data and data changes. Such separation is of great significance for reducing the coupling between functions.

public static interface Subscription {<!-- -->
    public void request(long n);

    public void cancel();
}

In the interface design of Flow.Subscription, we define two methods. One method represents the amount of data the subscriber wishes to receive, which is the Subscription.request method. Another method indicates that the subscriber wants to cancel the subscription, which is the Subscription.cancel method.

Transmission of data

In addition to the initial source and final outcome, there is another process of data representation, which is the transmission of data. The process of data transfer includes both receiving input data and sending output data. During the data transfer process, the content of the data may change, and the amount of data may also change (for example, part of the data may be filtered out, or the input data may be modified, or even the input data may be replaced).

In the design of Java’s reactive programming model, such a process is represented by Flow.Processor. Flow.Processor is an interface that extends Flow.Publisher and Flow.Subscriber. Therefore, Flow.Processor has two data types, the generic T represents the type of input data, and the generic R represents the type of output data.

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {<!-- -->
}

The following code is an example of data transfer. In this code, we use generics to represent the types of input data and output data; then, we use a Function function to represent how to process the received data and output the processing results. This arrangement gives the example universal relevance. With a few modifications, you can use it in actual scenarios.

package co.ivi.jus.flow.reactive;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;

public class Transform<T, R> extends SubmissionPublisher<R>
        implements Flow.Processor<T, R> {<!-- -->
    private Function<T, R> transform;
    private Flow.Subscription subscription;
    
    public Transform(Function<T, R> transform) {<!-- -->
        super();
        this.transform = transform;
    }
    
    @Override
    public void onSubscribe(Flow.Subscription subscription) {<!-- -->
        this.subscription = subscription;
        subscription.request(1);
    }
    
    @Override
    public void onNext(T item) {<!-- -->
        submit(transform.apply(item));
        subscription.request(1);
    }
    
    @Override
    public void onError(Throwable throwable) {<!-- -->
        closeExceptionally(throwable);
    }
    
    @Override
    public void onComplete() {<!-- -->
        close();
    }
}

Concatenation of processes

Since the representation of data is divided into two basic forms: input and output, and three processes derived from them are also provided, we can easily connect the data processing processes in series.

The code below is an example of our attempt to connect the three processes of initial source, data transfer and final outcome into a larger process. Of course, you can also try to connect more data processing processes in series.

private static void transform(byte[] message,
          Function<byte[], byte[]> transformFunction) {<!-- -->
    SubmissionPublisher<byte[]> publisher =
            new SubmissionPublisher<>();

    // Create the transform processor
    Transform<byte[], byte[]> messageDigest =
            new Transform<>(transformFunction);

    // Create subscriber for the processor
    Destination<byte[]> subscriber = new Destination<>(
            values -> System.out.println(
                    "Got it: " + Utilities.toHexString(values)));

    //Chain processor and subscriber
    publisher.subscribe(messageDigest);
    messageDigest.subscribe(subscriber);
    publisher.submit(message);

    // Close the submission publisher.
    publisher.close();
}

The form of series connects the relationships between different links; and the code of each link can also be reused in different scenarios. Supporting the concatenation of processes is one of the biggest driving forces behind the power of the reactive programming model. Programming languages like Scala even upgrade process concatenation to the programming language level to support it. Doing so will undoubtedly greatly improve the efficiency of coding and the beauty of the code.

Concise refactoring

After introducing Java’s reactive programming model design, we have to look back at the questions we raised in the reading case. How does reactive programming solve the delay consequences caused by the sequential execution mode? In reactive programming, how to solve the congestion problem caused by callback functions?

Let’s first take a look at the code using the reactive programming model, and then discuss these issues. The following code is our improvement to the usage of Digest in the reading case.

Returned<Digest> rt = Digest.of("SHA-256");
switch (rt) {<!-- -->
    case Returned.ReturnValue rv -> {<!-- -->
        //Get the returned value
        if (rv.returnValue() instanceof Digest d) {<!-- -->
            // Call the transform method for the message digest.
            transform("Hello, World!".getBytes(), d::digest);

            // Wait for completion
            Thread.sleep(20000);
        } else {<!-- --> // unlikely
            System.out.println("Implementation error: SHA-256");
        }
    }
    case Returned.ErrorCode ec ->
            System.out.println("Unsupported algorithm: SHA-256");
}


In this example, we did not find the same heaping phenomenon as the callback function. What plays an important role here is the series of processes we mentioned above. The design of process concatenation and data control in Java’s reactive programming model, as well as the separation of data input and output, reduce the coupling of code and eliminate the need for nested calls.

In this example, we also see the direct use of the Digest.digest method. In order to be able to use the reactive programming model, we do not need to modify the Digest code. As long as the original design and implementation of Digest are properly put into the reactive programming model, the asynchronous and non-blocking idea can be realized. This is undoubtedly very attractive. Who would subvert the existing code if they were not forced to do so?

So how does the reactive programming model support asynchronous non-blocking? In fact, like callback functions, reactive programming can support both synchronous blocking mode and asynchronous non-blocking mode. If these interface implementations are in asynchronous non-blocking mode, the calls to these implementations are also asynchronous and non-blocking. Of course, the main usage scenario of the reactive programming model is still the asynchronous non-blocking mode.

For example, SubmissionPublisher in our example is an implementation of asynchronous non-blocking mode. In the above code, if Thread.sleep is not called, we may not see the processing results of Digest, and the main thread will exit. This is what a non-blocking implementation looks like.

Defects and Countermeasures

So far, the reactive programming model looks perfect. However, the shortcomings of the reactive programming model are also serious. The most fatal flaw is that errors are difficult to troubleshoot, which is a common problem with asynchronous programming. The decoupled design of the reactive programming model makes error troubleshooting more difficult, which will seriously affect the efficiency of development and reduce the maintainability of the code.

At present, the direction of solving the shortcomings of the reactive programming model, or the shortcomings of asynchronous programming, seems to be returning to the old path of the imperative programming model. The most worth mentioning here is the concept of coroutine (Fiber) (currently, Java’s coroutine mode has not been released, but I can take you to understand it first).

Let’s take a look at the code mentioned in the reading case again. To make it easier for you to read, I’ve copied and pasted it here.

try {<!-- -->
    Digest messageDigest = Digest.of("SHA-256");
    byte[] digestValue =
            messageDigest.digest("Hello, world!".getBytes());
} catch (NoSuchAlgorithmException ex) {<!-- -->
    System.out.println("Unsupported algorithm: SHA-256");
}

In Java’s imperative programming model, this code is executed in a thread. We first call the Digest.of method to get a Digest instance; then we call the Digest.digest method of this instance to get a return value. The thread will be in a waiting state before each method returns. The waiting of threads is the biggest factor causing waste of resources.

The coroutine processing method eliminates thread waiting. If the call is blocked, the resource will be switched out and other operations will be performed. This saves a lot of computing resources and enables the system to support large-scale concurrency in blocking mode. If the imperative programming model can support large-scale concurrency through coroutines, perhaps it is a new technology that subverts the existing high-concurrency architecture.

Currently, Java’s coroutine mode has not been released. What impact can it have on the reactive programming model, and how much convenience can it bring to us in implementing large-scale concurrent systems? We still have to wait for some time for the answers to these questions.

Summary

Okay, here, let me make a summary. Earlier, we discussed imperative and declarative programming models, callback functions and callback hell, and the basic components of Java’s reactive programming model.

Due to space limitations, we cannot discuss the various potentials and changes of the Java reactive programming model, such as hot words such as “reactive manifesto” and “backpressure”. I recommend that you continue to learn more about these requirements of reactive programming (such as Reactive Manifesto and Reactive Systems), as well as mature star products (such as Akka and Spring 5+).

Since Java’s coroutine pattern has not yet been released, I have no clear judgment on the future of reactive programming. You are also welcome to leave a message in the message area and discuss the present and future of reactive programming.

In addition, I also mentioned a few technical points discussed today, which may appear in your interview. After this study, you should be able to:

  • Understand the terms imperative and declarative programming models;
  • Interview question: Do you know the declarative programming model and how it works?
  • Understand the basic components of Java’s reactive programming model and how they fit together;
  • Interview question: Do you know how to use the Java reactive programming model?
  • Know the form of callback functions and the term callback hell.
  • Interview question: Do you know what’s wrong with callback functions?