Multithreading in Reactive Programming Reactor

In Reactive Programming in Java, we had a basic understanding of reactive programming. And incidentally learned about Reactor, the best time for reactive programming in Java. Today we continue to learn about reactive programming through Reactor, the multithreading in Reactor we are going to talk about today.

Traditional Java multithreading

In traditional Java multi-threaded development scenarios, we usually use the Executors tool class to create thread pools, usually of the following types:

  • newCachedThreadPool Creates a cache thread pool with an elastic size. If the thread pool length exceeds the processing needs, the idle thread will be recycled flexibly. If there are no recyclable threads, create a new thread.

  • newFixedThreadPool creates a thread pool with a fixed size, which can control the maximum number of concurrent threads, and the excess threads will wait in the queue.

  • newScheduledThreadPool creates a fixed-size thread pool that supports timing and periodic task execution.

  • newSingleThreadExecutor creates a single-threaded thread pool, which only uses the only worker thread to execute tasks, ensuring that all tasks are executed in the specified order.

  • newWorkStealingPool Creates a thread pool that supports work-stealing (task stealing, mentioned in the previous Go concurrency model, and I will write an article to introduce it later).

It can be said that the traditional Executors tool class has made it very convenient for us to use the thread pool. Reactor goes a step further and proposes a more fool-like scheduler Scheduler .

Reactor multithreading

Reactor provides the Schedulers class to create the following thread environments:

  • Schedulers.immediate() The current thread.

  • Schedulers.single() provides the same thread for all callers to use. If you want to use an exclusive thread, you can use Schedulers.newSingle() .

  • Schedulers.elastic() Elastic thread pool, create a thread pool as needed, and reuse idle threads. The thread pool will be discarded if it is idle for too long. It is more useful for I/O blocking scenarios. Schedulers.elastic() can assign a thread to each blocked task, so as not to hinder other tasks and resources.

  • Schedulers.paraller() Fixed-size thread pool, the size created by default is the same as the number of CPUs.

  • Schedulets.fromExecutorService(ExecutorService) Customize the thread pool, and create a Scheduler based on the custom Ex secondary input Service.

The Schedulers class has pre-created several commonly used thread pools: using single(), elastic() and parallel() method can use the built-in single thread, elastic thread pool and fixed-size thread pool respectively. If you want to create a new thread pool, you can use the newSingle(), newElastic() and newParallel() methods.

We can see from the above description that there is a certain correspondence between Schedulers in Reactor and traditional Java multithreading:

  • Schedulers.single() and Schedulers.newSingle() correspond to Executors.newSingleThreadExecutor();

  • Schedulers.elastic() and Schedulers.newElastic() correspond to Executors.newCachedThreadPool();

  • Schedulers.parallel() and Schedulers.newParallel() correspond to Executors.newFixedThreadPool();

The bottom layers of the above three schedulers provided by Schedulers are all based on ScheduledExecutorService, so they all support task timing and periodic execution;

Thread environment switching in Reactor

Compared with traditional Java multi-threaded programming, Reactor provides a very convenient way to switch threads. That is the publishOn and subscribeOn methods.

Reactor is essentially the processing of data streams, and the basic model is also a publish-subscribe model. Generally speaking, when we finally call the subsribe() method, a top-to-bottom data flow (publisher->subscriber) is formed. But in fact, there is also a subscription flow from bottom to top (subscriber->publisher), which can feed back the request of the subscriber from bottom to top.

Understand the above content, we can well understand the difference between publishOn and subscribeOn, subscribeOn will change from bottom to top with the help of subscription flow The thread execution environment of the source. And publishOn will use the data flow to change the subsequent execution environment from top to bottom.

Let's take a closer look with the following example:


@Test
public void testScheduling() {
    Flux. range(0, 1)
        .log() // 1
        .publishOn(Schedulers. newParallel("myParallel"))
        .log() // 2
        .subscribeOn(Schedulers. newElastic("myElastic"))
        .log() // 3
        .blockLast();
}
  • The log at position 1: will print that the current thread is myElastic-x

  • 2 log: will print the current thread is myParallel-x

  • 3 log: will print the current thread is myParallel-x

Through this log analysis, we can get the above conclusion:

  • publishOn will affect subsequent operators in the chain, such as the logs at 2 and 3 above

  • subscribeOn no matter where it appears, it only affects the execution environment of the source, such as the log above 1

And judging from our usage, thread switching in Reactor is very convenient. Just use publishOn and subscribeOn to customize our threading environment.

Summary

Multithreading in Reactor is essentially the same as multithreading in traditional Java programming, but it adds a lot of syntactic sugar to facilitate the use of multithreading in reactive programming. And it is also very convenient to switch between multi-threaded environments in Reactor. This is more of a customized process than the reactive programming we mentioned in the reactive programming in Java. Once our process is customized, the final data It will flow to the final position like flowing water in the way we expect.

syntaxbug.com © 2021 All Rights Reserved.