Reactive programming Reactor

Reactive programming Reactor

Background

  • Introduction to synchronization, asynchronous, blocking, and non-blocking. Introduction to network programming models.

Traditional imperative programming has some limitations when facing current needs.

Current needs:

Applications need to be highly available and provide low latency even when application load is high.

Performance overhead caused by synchronization

A request comes in and waits for the response. If the response takes a long time, the performance will be low under high concurrency.

  • Thread per Request model (one request, one thread)

  • Wait for I/O operation

    There is also a lot of waste of resources in I/O operations: such as calling the database, reading files, etc.

    At this time, the thread that issued the I/O request will block and wait for the completion of the I/O operation, that is, blocking I/O. These threads are blocked just to wait

    Waiting for a response wastes threads and wastes memory.

  • response delay

    Another problem with traditional imperative programming is that when a service needs to do many operations rather than just I/O requests, the response latency is relatively high.

    should increase.

    For example, service A needs to call services B and C, such as querying the database, aggregating the results and returning them. This means that the response time of service A includes:

    • Response time of service B (network delay time + processing time)
    • Response time of service C (network delay time + processing time)
    • Database request response time (network delay time + processing time)

Problems with blocing mode

  • A Thread may occupy 1M of memory (64bit JVM)
  • Context Switch Context switching causes high overhead
  • Need to rely on synchronization tool mutual exclusion
  • Code logic involving multi-state updates is difficult to maintain

1. Blocking is a waste of resources
Applications need to cope with a large number of concurrent users. Even with the rapid development of hardware processing capabilities, software performance is still a key factor.

Two ideas to improve program performance:

  • Parallelize: Use more threads and hardware resources.
  • Improve execution efficiency based on existing resources.
    Write code using blocking. This is no problem. After a performance bottleneck occurs, we can add processing threads, and the threads are also blocked code. But this way of using resources quickly faces resource contention and concurrency problems.
    Worse, blocking wastes resources. Specifically, when a program faces a delay (usually in terms of I/O, such as database read and write requests or network calls), the thread needs to enter the idle state to wait for data, thus wasting resources.
    Therefore, parallelization is not a silver bullet. This is a way to unlock the potential of hardware, but it introduces complexity and can lead to waste.

2. Can asynchronous solve the problem?
The second idea – improving execution efficiency – can solve the problem of resource waste. By writing asynchronous non-blocking code, (after the task initiates an asynchronous call) the execution process will switch to another active task using the same underlying resources, and then wait for the asynchronous call to return the result before processing.

Limitations of JDK asynchronous API

  • Future
    Call the get method to force the change to synchronization mode
    Difficulty cooperating with multiple asynchronous tasks
  • Callback
    Code is difficult to read
    Difficulty changing order
    Parallel semantics are difficult to express
Callback Hell

The main reason for callback hell is that there are too many levels of nested functional logic code, which reduces readability and makes maintenance difficult.

public static <T> void query(String queryStr, Consumer<List<T>> callback) {<!-- -->
        // asynchronous
        new Thread(() -> {<!-- -->
            List<T> res = null;
            //Specific query results are inserted into it
            callback.accept(res);
        }).start();
    }

    public static void callbackDemo() {<!-- -->
        //The result of the second query is based on the result of the first query
        // res = query(); res2 = query(res)
        query("query1", list -> {<!-- -->
            query("query2", list2 -> {<!-- -->
                // list1 & list2
                System.out.println("Specific processing results");
            });
        });
    }
Challenges of asynchronous programming
  • Orchestration/organization of asynchronous sources
    • merge events
      sliding window
  • Asynchronous processing logic involves other asynchronous
    • The result of asynchronous is one/more new asynchronous events
    • Asynchronous processing itself requires the results of other asynchronous tasks
  • Asynchronous result processing
    • Wait for multiple asynchronous tasks to be processed successfully before proceeding to the next step
  • Exception handling
Example
  • In a piece of asynchronous code, another IO operation needs to be performed, causing blocking

  • Asynchronous code exception handling

  • Classic double-click event (time window): an event greater than two clicks within the specified time range

    If it is event processing, many intermediate values need to be recorded.

    Event processing, monitoring various events, various APIs are OnxxEvent, when this event occurs, corresponding processing logic is done inside, it is executed passively, and you don’t know when it will be executed.
    onXXEvent(evt){}

    Spring eventListen

Responsive programming

It is a programming paradigm that uses asynchronous + event-driven to build non-blocking applications and only needs a small number of threads to scale horizontally.

EveryThing is Object -> EveryThing is Event

  • event driven
    The idea is that interactions between components are achieved through loosely coupled producers and consumers, and events are sent and received in an asynchronous, non-blocking manner.
    The event-driven system is implemented through the push mode, that is, the producer pushes the data to the consumer for processing when the message is generated, rather than having the consumer continuously poll or wait for data.
  • Respond promptly
    Since reactivity is asynchronous, for example, if you are performing data processing, you can quickly return after handing over the task, instead of blocking and waiting for the task to complete before returning. The execution of the task is given to the background thread, and it returns after the task processing is completed, such as Java8’s CompletableFuture.
  • event resiliency
    The event-driven system is loosely coupled, and there is no direct dependence between the upstream and downstream systems, but the cost of debugging is higher.

Responsive flow specification

The Reactive Streams specification stipulates the use of backpressure to interact between asynchronous components

Reactive streaming is adapted using the Flow API in Java 9. The Flow API is an interoperability specification, not a specific implementation, and its semantics are consistent with the reactive flow specification.

The reactive streaming specification includes the following interfaces:

  • Publisher

    Represents the producer or data source of the data stream, including a method for subscribers to register with the publisher. Publisher represents the publisher and

Standardized entry point for subscribers to connect directly.

public interface Publisher<T> {<!-- -->
public void subscribe(Subscriber<? super T> s);
}
  • Subscriber:

    Representing consumers, the onSubscribe method provides us with a standardized way to notify Subscriber that the subscription is successful.

    • onSubscribe is called by the publisher before starting processing and passes a subscription ticket object to the subscriber

    (Subscription).

    • onNext is used to notify subscribers that the publisher has published a new data item.
    • onError is used to notify subscribers that the publisher has encountered an exception and will no longer publish data events.
    • onComplete is used to notify subscribers that all data events have been published.
    public interface Subscriber<T> {<!-- -->
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
    }
    

    ?

  • Subscription

    At the same time, the incoming parameter of the onSubscribe method introduces a subscription ticket named Subscription.

    Subscription provides the basis for controlling the production of elements.

    public interface Subscription {<!-- -->
    public void request(long n);
    public void cancel();
    }
    
    • request is used by the subscriber to notify the publisher of the number of elements that need to be published subsequently.
    • cancel is used to allow the subscriber to cancel subsequent event streams from the publisher.
  • Processor:

    If the entity needs to convert the incoming items and pass the converted items to another subscriber, the Processor interface is needed.

    This interface is both a subscriber and a publisher:

    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {<!-- -->
    }
    

    ?

Spring Reactor

Spring Reactor is a solution implemented by Pivotal based on reactive programming. It is a non-blocking, event-driven programming solutionimplemented using functional programming.
Reactor is a completely non-blocking reactive programming framework for the JVM, with efficient demand management (i.e., control of “backpressure”) capabilities. It integrates directly with Java 8 functional APIs such as CompletableFuture, Stream, and Duration. It provides the asynchronous sequence API Flux (for [N] elements) and Mono (for [0|1] elements), and fully follows and implements the “Reactive Extensions Specification”.
Reactor’s reactor-ipc component also supports non-blocking inter-process communication (IPC). Reactor IPC provides a network engine that supports backpressure for HTTP (including Websockets), TCP, and UDP, making it suitable for use in microservice architectures. And fully supports reactive encoding and decoding

  • lazy evaluation

eg:

Backpressure


If in pull mode, the producer speed and consumer consumption speed do not match, pressure will occur.
Reactor supports back pressure, and the pressure is elastic, just like the tcp/ip sliding window. It can receive quickly and send quickly, and it can receive slowly and send slowly.

Development

project reactor

  • Implemented reactive reactor features
  • As the default implementation of Spring 5 Reator pattern
  • Support back pressure
  • Better readability

Use

Create Flux & Mono
Flux: an asynchronous sequence containing 0-N elements
Mono: Asynchronous 0-1 results

Flux:
  • just
  • fromArray/Iterable/stream
  • generate/create/push/from
  • interval/range
  • empty/error
Mono:
  • fromCallable/fromRunnable/fromFuture/fromSupplier
  • defer

Code demo

Commonly used APIs

? Basic operations
? filter/defaultIfEmpty/any/all
? map/flatMap
? reduce/collect/collect(list/map/set…)
? take/skip
?sort/count
?Response to events:
? doOnRequest/doOnNext/doOnComplete/doOnError/doOnCancel/doFinally
? Batch:
? buffer/window/groupBy
? Multiple flux operations:
? merge/concat/zip/combine/join
? firstWithSingal/firstWithValue
? other:
? Index/limitRate/log



In the reactive library of the Rx family, reactive streams are divided into two types: “hot” and “cold”. The difference mainly lies in how the reactive stream responds to subscribers:
? A “cold” sequence means that for each Subscriber, all data from the beginning will be received. If the origin generates an HTTP request, a new HTTP request is created for each subscription.
A “hot” sequence means that for a Subscriber, only the data sent after it started subscribing can be obtained. Note, however, that some “hot” reactive streams can cache some or all of the historical data. Generally speaking, a “hot” reactive stream can emit data even if there are no subscribers to receive the data (this conflicts with the “nothing happens before Subscribe()” rule)



During the assembly phase, assembly continues but does not run.
During the subscription phase, subscribers will be assembled internally.
Finally trigger run onNext

The core calls of the entire Project Reactor are divided into the following stages:
? Declaration phase
? subscribe stage
? onSubscribe phase
? request stage
calling phase

eg:


Reference link:
reactor
io model
callback hell
reactor manual