Reactor threading model

Reactor threading model

    • 1. Concept
    • 2. Role
    • 3. Model type
      • Single Reactor – single thread
      • Single Reactor – Multithreading
      • ?Master-slave Reactor-Multithreading
      • Code design (refer to zk)
        • 1. Create a service and create a service context factory
        • 2. Initialize the service context factory configuration
        • 3. Start the service, start the thread in turn from the inside out (worker > selector > accept)
        • 4. Handle client request connection in Accept Thread
        • 5.Detect IO events in SelectorThread
        • 6. Worker thread processing IO time

1. Concept

A concurrent programming model is an idea, also called 1 + M + N thread mode, widely used, such as Nginx, Memcached, Netty, etc.

2. Role

Reactor: Responsible for monitoring and distributing events, dispatching IO events to corresponding Handlers, new events include connection establishment ready, read ready, write ready, etc.
Acceptor: Handle new client connections and dispatch requests to handlers
Handler: Bind itself to events, perform non-blocking IO tasks, complete channel read and write, first-level business logic

3. Model type

Single Reactor-single thread


Disadvantages: All accepting connections and processing data operations are completed in one thread, which has performance bottlenecks

Disadvantages: Putting time-consuming data encoding, decoding, calculation and other operations into the thread pool for execution, although it improves performance, it is not the best way

Single Reactor-multithreading

?Master-slave Reactor-multithreading

Master-slave multi-threading, for the server, the link receiving the client is operated by a thread alone

work process:

  1. Reactor main thread MainReactor monitors the client connection time through select, and after receiving the event, handles the client connection event through Acceptor
  2. When the Acceptor finishes processing the link event, the MainReactor assigns the connection to the SubReactor
  3. SubReactor adds the connection to its own connection queue for listening, and creates Handler for various events to process
  4. When a new event occurs on the connection, Subreactor will call the corresponding Handler for processing
  5. The Handler obtains data from the channel through read, and distributes the request to the Worker thread pool to process the business
  6. The Worker thread pool will allocate independent threads to complete the business processing, and return the processing request to the Handler, and the Handler will respond to the data to the client through send
  7. One MainReactor can correspond to multiple SubReactors, that is, one MainReactor thread corresponds to multiple SubReactor threads

Code design (refer to zk)

1. Create a service, create a service context factory

Server server = new Server();
CountDownLatch latch = new CountDownLatch(1);
server.registerShutDownHandler(new ServerShutDownHandler(latch));

// create a connection manager factory
if (config.getClientPortAddress() != null) {<!-- -->
    cnxnFactory = ServerCnxnFactory.createFactory();
    // initial configuration
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);

2. Initialize service context factory configuration

// Initialize the maximum number of connections
this.maxClientCnxns = maxClientCnxns;
initMaxCnxns();

// Prepare related resources and get the number of cpu cores
int coreNum = Runtime.getRuntime().availableProcessors();
// Calculate the number of threads used to detect client IO events
numSelectorThreads = Integer.getInteger(NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((double) coreNum / 2), 1));
// Calculate the number of threads used to handle client IO events
numWorkerThreads = Integer.getInteger(NIO_NUM_WORKER_THREADS, 2 * coreNum);

//Get ready for SelectThread
for (int i = 0; i < numSelectorThreads; i ++ ) {<!-- -->
    selectorThreads. add(new SelectorThread(i));
}
// Open a ServerSocketChannel on the server side
this.ss = ServerSocketChannel.open();
this.ss.configureBlocking(false);
// Close the immediate release port
ss.socket().setReuseAddress(true);
if ((listenBacklog = clientPortListenBacklog) == -1) {<!-- -->
    ss.socket().bind(clientPortAddress);
} else {<!-- -->
    ss.socket().bind(clientPortAddress, listenBacklog);
}
//Create AcceptThread
acceptThread = new AcceptThread(this.ss, selectorThreads, clientPortAddress);

Create a fixed-length Selector IO detection thread, create a ServerSocketChannel, listenBacklog request queue size, and create an accept thread

3. Start the service, start the thread from the inside out (worker > selector > accept) in turn

// initial configuration
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
//Start the connection manager
cnxnFactory.startup(server);
// service stop sign
stopped = false;
// Start the IO worker thread pool
if (workerPool == null) {<!-- -->
    workerPool = new IOWorkerService(numWorkerThreads, "IOWorker");
}

// start selector thread
for (SelectorThread selectorThread : selectorThreads) {<!-- -->
    if (selectorThread.getState() == Thread.State.NEW) {<!-- -->
        selectorThread. start();
    }
}
// start accept thread
if (acceptThread. getState() == Thread. State. NEW) {<!-- -->
    acceptThread. start();
}

4. Processing client request connection in Accept Thread

/**
 * AcceptThread detects whether there is a new connection
 */
@Override
public void run() {<!-- -->
    try {<!-- -->
        while (!stopped & amp; & amp; !serverSocketChannel.socket().isClosed()) {<!-- -->
            //Execute the select program
            select();
        }
    } finally {<!-- -->
        closeSelector();
    }
}

private void select() {<!-- -->
    try {<!-- -->
        selector. select();
        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
        while (!stopped & amp; & amp; iterator.hasNext()) {<!-- -->
            SelectionKey key = iterator. next();
            iterator. remove();

            if (!key.isValid()) {<!-- -->
                continue;
            }
            // Determine whether it is accept
            if (key.isAcceptable()) {<!-- -->
                // receive connection
                if (!doAccept()) {<!-- -->
                    pauseAccept(10);
                }
            }
        }
    } catch (IOException e) {<!-- -->
        e.printStackTrace();
    }
}

/**
 * Receive new connections and bind to a selector thread
 *
 * @return
 */
private boolean doAccept() {<!-- -->
    boolean accepted = false;
    SocketChannel socketChannel = null;
    try {<!-- -->
        socketChannel = serverSocketChannel. accept();
        accepted = true;

        // TODO cnxn over limit

        // set non-blocking
        socketChannel. configureBlocking(false);
        // Bind the current connection to a selector thread round robin
        if (!selectorIterator.hasNext()) {<!-- -->
            selectorIterator = selectorThreads. iterator();
        }
        SelectorThread selectorThread = selectorIterator. next();
        if (!selectorThread.addAcceptedConnection(socketChannel)) {<!-- -->
            throw new IOException("unable to add connection to selector thread");
        }
    } catch (IOException e) {<!-- -->
        e.printStackTrace();
        fastCloseSock(socketChannel);
    }
    return accepted;
}

doAccept is the main processing connection receiving code, the selector receives the client request, receives a SocketChannel, and adds it to the selecor thread (IO detection thread), where the iterator is used to achieve the effect of round robin when obtaining the selecor thread , the purpose is to evenly distribute SocketChannel to the queue of selecor thread

5.Detect IO events in SelectorThread

@Override
public void run() {<!-- -->
    while (!stopped) {<!-- -->
        try {<!-- -->
            // IO select
            select();
            // Handle newly received connections
            processAcceptedConnections();

            processInterestOpsUpdateRequests();
        } catch (Exception e) {<!-- -->
            e.printStackTrace();
        }
    }

    //
    // Close connections still pending on the selector. Any others
    // with in-flight work, let drain out of the work queue.

    for (SelectionKey key : selector. keys()) {<!-- -->
        NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
        if (cnxn.isSelectable()) {<!-- -->
            cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
        }
        cleanupSelectionKey(key);
    }
    SocketChannel accepted;
    while ((accepted = acceptedQueue. poll()) != null) {<!-- -->
        fastCloseSock(accepted);
    }
}

private void select() {<!-- -->
    try {<!-- -->
        selector. select();
        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
        while (!stopped & amp; & amp; iterator.hasNext()) {<!-- -->
            SelectionKey key = iterator. next();
            iterator. remove();

            if (!key.isValid()) {<!-- -->
                cleanupSelectionKey(key);
                continue;
            }

            //Check if there is IO
            if (key.isReadable() || key.isWritable()) {<!-- -->
                handleIO(key);
            }
        }
    } catch (IOException e) {<!-- -->
        e.printStackTrace();
    }
}

private void processAcceptedConnections() {<!-- -->
    SocketChannel sc;
    while (!stopped & amp; & amp; (sc = acceptedQueue.poll()) != null) {<!-- -->
        SelectionKey key = null;
        try {<!-- -->
            key = sc. register(selector, SelectionKey. OP_READ);
            // package connection
            NIOServerCnxn cnxn = createConnection(sc, key, this);
            key.attach(cnxn);

        } catch (IOException e) {<!-- -->
            e.printStackTrace();

            cleanupSelectionKey(key);
            fastCloseSock(sc);
        }
    }
}

// wrap selectionKey, submit to IOWorkPool
private void handleIO(SelectionKey key) {<!-- -->
    // Encapsulate IOWorkRequest
    IOWorkRequest ioWorkRequest = new IOWorkRequest(this, key);

    NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

    // Stop selecting this key while processing on its connection
    cnxn.disableSelectable();
    // IO processing is in an asynchronous thread, the Select thread will continue to select the key, and pause the detection before the current IO is completed
    key.interestOps(0);

    //Hand to workerPool
    workerPool.schedule(ioWorkRequest);
}

The select sequence checks whether there are new read and write events, processAcceptedConnections handles the time binding of new connections, traverses the obtained SelectionKey in select, and performs IO operations, which are mainly implemented in handleIO; note that IO processing is in an asynchronous thread , the Select thread will continue to select the key, you can use the interestOps method to mark the SelectionKey without any interest operations, the SelectionKey will not be detected when selector.select(), and finally the context information is encapsulated into the IOWorkRequest for the submission of the worker thread

6.Worker thread processing IO time

// handle IOWork
public void schedule(IOWork ioWork) {<!-- -->
    if (stopped) {<!-- -->
        ioWork. cleanup();
        return;
    }
    // Wrap IOWork into a handler
    IOWorkHandler ioWorkHandler = new IOWorkHandler(ioWork);
    if (ioWorkerPool != null) {<!-- -->
        try {<!-- -->
            ioWorkerPool. execute(ioWorkHandler);
        } catch (RejectedExecutionException e) {<!-- -->
            e.printStackTrace();
            ioWork. cleanup();
        }
    } else {<!-- -->
        ioWorkHandler. run();
    }
}

Encapsulate ioWork into IOWorkHandler to submit transaction, IOWorkHandler.run()->IOWork.doWork()

// really handle IO
@Override
public void doWork() throws Exception {<!-- -->
    if (!key.isValid()) {<!-- -->
        selectorThread. cleanupSelectionKey(key);
        return;
    }

    if (key.isReadable() || key.isWritable()) {<!-- -->
        // perform IO operation
        cnxn.doIO(key);

        if (stopped) {<!-- -->
            cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
            return;
        }
        if (!key.isValid()) {<!-- -->
            selectorThread. cleanupSelectionKey(key);
            return;
        }
    }

    // Mark this connection as once again ready for selection
    cnxn.enableSelectable();
    cnxn.requestInterestOpsUpdate();
}
/**
 * Handles read/write IO on connection.
 */
public void doIO(SelectionKey key) {<!-- -->
    try {<!-- -->
        if (!isSocketOpen()) {<!-- -->
            return;
        }

        //handle the read event
        if (key.isReadable()) {<!-- -->
            int i = sc. read(incomingBuffer);
            if (i < 0) {<!-- -->
                handleFailedRead();
            }

            // wait until incomingBuffer is full
            if (incomingBuffer. remaining() == 0) {<!-- -->

                boolean isBody = false;

                // If the header is full
                if (incomingBuffer == headerBuffer) {<!-- -->
                    incomingBuffer. flip();
                    // read the length of the body
                    isBody = readHeader(key);
                    incomingBuffer. clear();
                } else {<!-- -->
                    isBody = true;
                }

                // if body is full
                if (isBody) {<!-- -->
                    readBody();
                } else {<!-- -->
                    return;
                }

            }
        }

        //handle writable events
        if (key.isWritable()) {<!-- -->
            handleWrite(key);
        }
    } catch (EndOfStreamException e) {<!-- -->
        e.printStackTrace();

        close(e. getReason());

    } catch (IOException e) {<!-- -->
        e.printStackTrace();

        close(DisconnectReason.IO_EXCEPTION);
    }

}

Obtain client data, readBody()>doRequest()>processPacket(), encapsulate the read data into BusinessHandler, and submit it to the business thread

 /**
  * Server-side processing data
  *
  * @param cnxn
  * @param incomingBuffer
  */
 public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) {<!-- -->
     // Encapsulate request
     Request request = new Request(cnxn, incomingBuffer);
     // TODO throttled
     // submit request
     businessService. submitRequest(request);
 }
/**
 * Submit request
 *
 * @param request
 */
public void submitRequest(Request request) {<!-- -->
    BusinessHandler handler = new BusinessHandler(request);
    executorService. execute(handler);
}

public class BusinessHandler implements Runnable {<!-- -->

    private final ServerCnxn cnxn;

    private final Request request;

    public BusinessHandler(Request request) {<!-- -->
        this.request = request;
        this.cnxn = request.getCnxn();
    }

    @Override
    public void run() {<!-- -->
        try {<!-- -->
            Serializer serializer = server. getSerializer();

            ByteBuffer reqBuf = this.request.getRequest();
            byte[] bytes = new byte[reqBuf. remaining()];
            reqBuf. get(bytes);
            RequestData requestData = null;
            if (serializer instanceof RequestSerializer) {<!-- -->
                RequestSerializer requestSerializer = (RequestSerializer) serializer;
                requestData = requestSerializer. decodeRequest(bytes);
            }
            System.out.println("The request data received by the server is: " + requestData);
            SecureRandom secureRandom = new SecureRandom();
            int i = secureRandom. nextInt(2000);
            System.out.println("It will take to simulate the execution of the request" + i + "milliseconds");
            Thread.currentThread().sleep(i);

            //write back data
            ResponseData response = new ResponseData();
            response.setId(requestData.getId());
            response. setStatus(200);
            response.setMsg("hello " + i);
            System.out.println("The response to be returned by the server is: " + response);

            if (serializer instanceof ResponseSerializer) {<!-- -->
                ResponseSerializer rs = (ResponseSerializer) serializer;
                byte[] rsbytes = rs. encodeResponse(response);
                ByteBuffer buffer = ByteBuffer. allocate(4 + rsbytes. length);
                buffer.putInt(rsbytes.length);
                buffer. put(rsbytes);
                buffer. flip();
                //send buffer
                cnxn. sendBuffer(buffer);

                // TODO throttled
            }
        } catch (Exception e) {<!-- -->
            e.printStackTrace();
        }
    }
}

In the business thread BusinessHandler, deserialize the received data, process the business logic, and finally write the response data in the context
ServerCnxn.sendBuffer() > requestInterestOpsUpdate() > SelectorThread.addInterestOpsUpdateRequest() Finally, write the data to be written into the update queue of the IO detection thread, execute the handlerIO operation once, and write the data back to the client

Thread ratio1(accept thread) : M(selector thread): N(busness thread)
Variant reactor thread ratio 1(accept thread) : M(selector thread) : N(ioWorker thread) : K(busness thread)