Reactor pattern

Article directory

    • 1. Single-threaded Reactor reactor mode
    • 2. Multi-threaded Reactor reactor mode

In Java’s OIO programming, the original and most primitive network server program uses a while loop to constantly monitor the port for new connections, and if so, calls a processing function to handle it. The biggest problem with this method is that if the processing of the previous network connection has not ended, then subsequent connection requests cannot be received, so all subsequent requests will be blocked, and the server’s throughput will be too low.

In order to solve this serious connection blocking problem, a classic mode has emerged: Connection Per Thread. That is, a thread is allocated for each new network connection, and each thread handles its own input and output independently. The input and output processing of any socket connection will not block the monitoring and establishment of new socket connections later. This is how early versions of Tomcat servers were implemented.

The advantage of this mode is that it solves the problem of previous new connections being severely blocked, and greatly improves the throughput of the server to a certain extent. However, for a large number of connections, a large amount of ready resources need to be consumed. If there are too many threads, the system cannot bear it. Moreover, repeated creation, destruction, and thread switching of threads also require costs. Therefore, the shortcomings of multi-threaded OIO in high-concurrency application scenarios are fatal, so the Reactor reactor mode was introduced.

The reactor mode consists of two major roles: Reactor reactor thread and Handlers processor:

  1. Responsibilities of the Reactor reactor thread: Responsible for responding to IO events and distributing them to Handlers processors
  2. Responsibilities of the Handlers processor: non-blocking execution of business processing logic

1. Single-threaded Reactor reactor mode

The Reactor reactor mode is somewhat similar to the event-driven mode. When an event is triggered, the event source will distribute the event dispatch to the handler processor for event processing. The reactor role in the reactor pattern is similar to the dispatcher role in the event-driven pattern.

  • Reactor: Responsible for querying IO events. When an IO time is detected, it is sent to the corresponding Handler processor for processing. The IO event here is the channel IO event monitored by the NIO selector.
  • Handler processor: bound to IO events, responsible for processing IO events, completing the actual connection establishment, channel reading, processing business logic, and writing results to the channel, etc.

Implementing the single-threaded version of the reactor pattern based on NIO requires the use of several important member methods of the SelectionKey selection key:

  1. void attach(Object o): Add any Java object as an attachment to the SelectionKey instance, mainly adding the Handler processor instance as an attachment to the SelectionKey instance
  2. Object attachment(): Remove the attachment that was previously added to the SelectionKey selection key instance through attach. It is generally used to remove the bound Handler processor instance.

Reactor implementation example:

package cn.ken.jredis;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

/**
 * <pre>
 *
 * 

*
* @author Ken-Chy129
* @since 2023/10/14 14:29
*/
public class Reactor implements Runnable {

final private Selector selector;

final private ServerSocketChannel serverSocketChannel;

public Reactor() {
try {
this.selector = Selector.open();
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8088));
//Register the accept event of ServerSocket
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// Bind handler to event
sk.attach(new AcceptHandler());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selectionKeys = selector.selectedKeys();
for (SelectionKey selectedKey : selectionKeys) {
dispatch(selectedKey);
}
selectionKeys.clear();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void dispatch(SelectionKey selectedKey) {
Runnable handler = (Runnable) selectedKey.attachment();
// What is returned here may be AcceptHandler or IOHandler
handler.run();
}

class AcceptHandler implements Runnable {
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
new IOHandler(selector, socketChannel); //Register the IO processor and add the connection to the select list
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

public static void main(String[] args) {
new Reactor().run();
}
}

Handler implementation example:

package cn.ken.jredis;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * <pre>
 *
 * 

*
* @author Ken-Chy129
* @since 2023/10/14 14:53
*/
public class IOHandler implements Runnable {

final private SocketChannel socketChannel;

final private ByteBuffer buffer;

public IOHandler(Selector selector, SocketChannel channel) {
buffer = ByteBuffer.allocate(1024);
socketChannel = channel;
try {
channel.configureBlocking(false);
SelectionKey sk = channel.register(selector, 0); // No events of interest are registered here
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ); //Register events of interest, which will only take effect the next time select is called
selector.wakeup(); // Immediately wake up the currently blocked select operation, so that you can quickly enter the next select, so that the read event monitoring registered above can take effect immediately
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void run() {
try {
int length;
while ((length = socketChannel.read(buffer)) > 0) {
System.out.println(new String(buffer.array(), 0, length));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

In the single-threaded reactor mode, both the Reactor reactor and the Handler processor are executed on the same thread (the dispatch method directly calls the run method without creating a new thread), so when one of the Handlers blocks, it will cause other All Handlers are not executed.

2. Multi-threaded Reactor reactor mode

Since the Reactor reactor and the Handler processor in the same thread will cause very serious performance defects, the basic reactor pattern can be transformed using multi-threading.

  1. Put the execution of the IOHandler processor responsible for input and output processing into an independent thread pool. In this way, the business processing thread is isolated from the reactor thread responsible for service monitoring and IO time query, preventing the server’s connection monitoring from being blocked.
  2. If the server has a multi-core CPU, the reactor thread can be split into multiple sub-reactor threads, and multiple selectors can be introduced at the same time. Each SubReactor sub-thread is responsible for a selector.

MultiReactor:

package cn.ken.jredis;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * <pre>
 *
 * 

*
* @author Ken-Chy129
* @since 2023/10/14 16:51
*/
public class MultiReactor {

private final ServerSocketChannel server;

private final Selector[] selectors = new Selector[2];

private final SubReactor[] reactors = new SubReactor[2];
private final AtomicInteger index = new AtomicInteger(0);

public MultiReactor() {
try {
server = ServerSocketChannel.open();
selectors[0] = Selector.open();
selectors[1] = Selector.open();
server.bind(new InetSocketAddress(8080));
server.configureBlocking(false);
SelectionKey register = server.register(selectors[0], SelectionKey.OP_ACCEPT);
register.attach(new AcceptHandler());
reactors[0] = new SubReactor(selectors[0]);
reactors[1] = new SubReactor(selectors[1]);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void startService() {
new Thread(reactors[0]).start();
new Thread(reactors[1]).start();
}

class SubReactor implements Runnable {
final private Selector selector;

public SubReactor(Selector selector) {
this.selector = selector;
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
selector.select();
Set selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
dispatch(selectionKey);
}
selectionKeys.clear();
} catch (IOException e) {
throw new RuntimeException(e);
}

}
}
}

private void dispatch(SelectionKey selectionKey) {
Runnable attachment = (Runnable) selectionKey.attachment();
if (attachment != null) {
attachment.run();
}
}

class AcceptHandler implements Runnable {
@Override
public void run() {
try {
SocketChannel socketChannel = server.accept();
new MultiHandler(selectors[index.getAndIncrement()], socketChannel);
if (index.get() == selectors.length) {
index.set(0);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

MultiHandler:

package cn.ken.jredis;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <pre>
 *
 * 

*
* @author Ken-Chy129
* @since 2023/10/14 17:28
*/
public class MultiHandler implements Runnable {

final private Selector selector;

final private SocketChannel channel;

final ByteBuffer buffer = ByteBuffer.allocate(1024);

static ExecutorService pool = Executors.newFixedThreadPool(4);

public MultiHandler(Selector selector, SocketChannel channel) {
this.selector = selector;
this.channel = channel;
try {
channel.configureBlocking(false);
SelectionKey register = channel.register(selector, SelectionKey.OP_READ);
register.attach(this);
selector.wakeup();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void run() {
pool.execute(() -> {
synchronized (this) {
int length;
try {
while ((length = channel.read(buffer)) > 0) {
System.out.println(new String(buffer.array(), 0, length));
buffer.clear();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
}