In Reactive Programming in Java, we had a basic understanding of reactive programming. And incidentally learned about Reactor, the best time for reactive programming in Java. Today we continue to learn about reactive programming through Reactor, the multithreading in Reactor we are going to talk about today.
Traditional Java multithreading
In traditional Java multi-threaded development scenarios, we usually use the Executors
tool class to create thread pools, usually of the following types:
-
newCachedThreadPool
Creates a cache thread pool with an elastic size. If the thread pool length exceeds the processing needs, the idle thread will be recycled flexibly. If there are no recyclable threads, create a new thread. -
newFixedThreadPool
creates a thread pool with a fixed size, which can control the maximum number of concurrent threads, and the excess threads will wait in the queue. -
newScheduledThreadPool
creates a fixed-size thread pool that supports timing and periodic task execution. -
newSingleThreadExecutor
creates a single-threaded thread pool, which only uses the only worker thread to execute tasks, ensuring that all tasks are executed in the specified order. -
newWorkStealingPool
Creates a thread pool that supports work-stealing (task stealing, mentioned in the previous Go concurrency model, and I will write an article to introduce it later).
It can be said that the traditional Executors
tool class has made it very convenient for us to use the thread pool. Reactor goes a step further and proposes a more fool-like scheduler Scheduler
.
Reactor multithreading
Reactor provides the Schedulers
class to create the following thread environments:
-
Schedulers.immediate()
The current thread. -
Schedulers.single()
provides the same thread for all callers to use. If you want to use an exclusive thread, you can useSchedulers.newSingle()
. -
Schedulers.elastic()
Elastic thread pool, create a thread pool as needed, and reuse idle threads. The thread pool will be discarded if it is idle for too long. It is more useful for I/O blocking scenarios.Schedulers.elastic()
can assign a thread to each blocked task, so as not to hinder other tasks and resources. -
Schedulers.paraller()
Fixed-size thread pool, the size created by default is the same as the number of CPUs. -
Schedulets.fromExecutorService(ExecutorService)
Customize the thread pool, and create a Scheduler based on the custom Ex secondary input Service.
The Schedulers
class has pre-created several commonly used thread pools: using single()
, elastic()
and parallel()
newSingle()
, newElastic()
and newParallel()
methods.
We can see from the above description that there is a certain correspondence between Schedulers in Reactor and traditional Java multithreading:
-
Schedulers.single()
andSchedulers.newSingle()
correspond toExecutors.newSingleThreadExecutor()
; -
Schedulers.elastic()
andSchedulers.newElastic()
correspond toExecutors.newCachedThreadPool()
; -
Schedulers.parallel()
andSchedulers.newParallel()
correspond toExecutors.newFixedThreadPool()
;
The bottom layers of the above three schedulers provided by Schedulers
are all based on ScheduledExecutorService
, so they all support task timing and periodic execution;
Thread environment switching in Reactor
Compared with traditional Java multi-threaded programming, Reactor provides a very convenient way to switch threads. That is the publishOn
and subscribeOn
methods.
Reactor is essentially the processing of data streams, and the basic model is also a publish-subscribe model. Generally speaking, when we finally call the subsribe()
method, a top-to-bottom data flow (publisher->subscriber) is formed. But in fact, there is also a subscription flow from bottom to top (subscriber->publisher), which can feed back the request of the subscriber from bottom to top.
Understand the above content, we can well understand the difference between publishOn
and subscribeOn
, subscribeOn
will change from bottom to top with the help of subscription flow The thread execution environment of the source. And publishOn
will use the data flow to change the subsequent execution environment from top to bottom.
Let's take a closer look with the following example:
@Test public void testScheduling() { Flux. range(0, 1) .log() // 1 .publishOn(Schedulers. newParallel("myParallel")) .log() // 2 .subscribeOn(Schedulers. newElastic("myElastic")) .log() // 3 .blockLast(); }
-
The log at position 1: will print that the current thread is
myElastic-x
-
2 log: will print the current thread is
myParallel-x
-
3 log: will print the current thread is
myParallel-x
Through this log analysis, we can get the above conclusion:
-
publishOn
will affect subsequent operators in the chain, such as the logs at 2 and 3 above -
subscribeOn
no matter where it appears, it only affects the execution environment of the source, such as the log above 1
And judging from our usage, thread switching in Reactor is very convenient. Just use publishOn
and subscribeOn
to customize our threading environment.
Summary
Multithreading in Reactor is essentially the same as multithreading in traditional Java programming, but it adds a lot of syntactic sugar to facilitate the use of multithreading in reactive programming. And it is also very convenient to switch between multi-threaded environments in Reactor. This is more of a customized process than the reactive programming we mentioned in the reactive programming in Java. Once our process is customized, the final data It will flow to the final position like flowing water in the way we expect.