[High concurrent network communication architecture] 4. Efficient event-driven model: Reactor model

Directory

1. Previous articles

2. Basic concepts

1 Introduction

2. Basic framework

3. Core features

4. Workflow

5. Use “network communication” to understand the Reactor model

Three, code implementation

1. Use epoll for multiplexing to realize the operation process of Reactor mode

2. Reactor mode implementation code (reference)


One, previous articles

[High concurrent network communication architecture] 1. The tcp server for single client connection under Linux

[High concurrency network communication architecture] 2. Introduce multi-threading to realize the tcp server of multi-client connection

[High concurrency network communication architecture] 3. Introduce IO multiplexing (select, poll, epoll) to achieve high concurrency tcp server

Second, basic concepts

1. Preface

  • In computer science, Reactor (interpretation of “reactor”) is a software design pattern, a commonly used event-driven programming model, often used to build high-performance concurrent applications. It describes a design pattern for handling concurrent events, which is realized through event loop and callback mechanism, and is widely used in network programming, server development and other fields.
  • The Reactor model itself is a pattern or idea, not a specific code framework or library. Different programming languages and platforms can use the Reactor model to implement event-driven applications, such as Node.js, Twisted, Netty, etc., are based on the Reactor model to build high-performance network applications.
  • In the Reactor model, event-driven is the main programming paradigm to listen to events and call corresponding callback functions accordingly to handle events.

2. Basic framework

  • Event Source: The event source is the object or component that generates the event. It could be a network socket, a file descriptor, a user input device, etc. The event source is responsible for monitoring and detecting the occurrence of events, and notifying the event dispatcher when the event occurs.
  • Event Loop: The event loop is a core component of the Reactor model. It is a loop structure responsible for monitoring the occurrence of events and dispatching corresponding event handlers to handle events. The event loop is constantly waiting for the occurrence of an event. Once an event occurs, it will hand over the event to the event dispatcher, and then call the corresponding event handler for processing. The event loop is responsible for managing the order and execution of events and ensuring that the processor does not get blocked.
  • Event Dispatcher: The event dispatcher is responsible for distributing the events sent by the event source to the appropriate event handlers. It receives events and, based on the type of event and other rules, chooses the best event handler for that type of event. The event dispatcher functions as an event route to ensure that the event is correctly delivered to the corresponding processor.
  • Event Handler: An event handler is an object that encapsulates the processing logic of a specific type of event. Each event handler defines how to handle a certain type of event. They encapsulate how to handle events and perform corresponding operations, such as reading network data, writing files, etc.
  • Multiplexer: A multiplexer is used to manage and listen to multiple event sources and notify the event loop when an event occurs. It can achieve efficient event distribution based on the underlying mechanisms provided by the operating system (such as select, poll, epoll).
  • Reactor: The reactor is a part of the event loop, which is responsible for coordinating the registration and de-registration, distribution and undistribution of events. It can manage multiple event sources and event processors, and provide a unified interface and scheduling mechanism.
  • Event Queue: The event queue is used to store and manage pending events. It can be a first-in-first-out (FIFO) queue or other data structure used to ensure that events enter the event loop in order for processing.

Summary

  • These components work together to form the basic framework of the Reactor model. They decouple the generation and processing of events, realize efficient event-driven processing in an asynchronous and non-blocking manner, and improve the performance and concurrency of applications.
  • It should be noted that although the Reactor mode is a basic concurrent programming mode, there may be different variants and improvements depending on the specific implementation and platform, such as multi-threaded Reactor, asynchronous Reactor, etc. Different variants can be selected and optimized according to the needs of the application and the characteristics of the system.

3. core features

  1. Event-driven: The Reactor pattern is an event-driven design pattern that drives the behavior of the application by listening to the occurrence of events. When an event occurs, Reactor will distribute the event to the corresponding processor for processing according to the event type. This event-driven mechanism enables applications to respond to external events and take appropriate actions according to the type of event.
  2. Non-blocking: Reactor mode handles events in a non-blocking manner. The generation and processing of events are performed concurrently, and the event loop and event handler are executed in a non-blocking manner. This means that while one event is being processed, the processing of other events will not be blocked, thereby improving the concurrency performance of the application.
  3. Multiplexing: The Reactor mode uses the multiplexing mechanism (select, poll, epoll, etc.) provided by the operating system to allow an event loop to listen to multiple event sources at the same time. Through this mechanism, the event loop can process multiple events at the same time, and only wake up the corresponding event handler for processing when an event occurs, avoiding the low efficiency of polling.
  4. Scalability: The Reactor pattern is designed to make applications scalable. For different types of events, corresponding event handlers can be created and registered in the event dispatcher. This enables better adaptation to changing workloads by dynamically increasing or decreasing the number of event processors based on the needs of the application.
  5. Flexibility: The Reactor pattern is designed to make applications more flexible and extensible. The generation and processing of events are decoupled. To add a new type of event, you only need to create a corresponding event handler and register it, without modifying the event loop. This flexibility allows applications to easily adapt to new requirements and changes.
  6. High performance: Due to the non-blocking and concurrent processing characteristics of the Reactor pattern, it enables high-performance event processing. The application will not block the processing of other events because of the processing of one event, thus improving the overall performance and throughput.

Summary

  • The Reactor pattern is event-driven, non-blocking, multiplexing, scalable, flexible, and high-performance. It can provide an efficient event processing mechanism, enabling applications to process multiple events concurrently, and has good maintainability and scalability.

4. Workflow

  1. Initialization: Create an event loop (Event Loop) and an event dispatcher (Event Dispatcher). The event loop is a main loop that is responsible for waiting for the arrival of events and dispatching corresponding event handlers to handle the events. The event dispatcher is responsible for passing events from the event loop to the appropriate event handlers.
  2. Register event source and event handler: Register the event source (Event Source) and the corresponding event handler (Event Handler) in the event dispatcher, and establish the association between the event source and the event handler. During registration, each event source and event handler is assigned a unique identifier for subsequent event distribution.
  3. Start the event loop: Start the event loop and enter the event monitoring state. The event loop starts to wait for the event to occur, and registers the corresponding event handler according to the type of event listened to by the event source.
  4. Wait for an event to occur: The event loop waits for an event to occur through the event dispatcher. This can be achieved through blocking methods of event sourcing, non-blocking methods, polling, etc. The event loop waits until at least one event source is notified.
  5. Event Notification: When an event occurs from one or more event sources, they notify the event loop and pass the event from the event source to the event loop.
  6. Event distribution: After the event loop receives the event notification, it passes the event to the event dispatcher.
  7. Route to event handler: The event dispatcher selects an appropriate event handler to handle the event according to the event type and other rules. It looks up the mapping between event sources and handlers based on the event identifier, and passes the event to the matching event handler.
  8. Execute event handlers: Event handlers perform actions to handle events. This can include reading event data, doing calculations, updating state, etc. Execution of event handlers is usually non-blocking in order to allow the event loop to continue listening for other events.
  9. Return to waiting state: When the event handler finishes processing the event, it returns to the event loop and continues to wait for the next event to occur.
  10. Repeated execution: The event loop repeats the above steps, monitors and processes events, until the stop condition is met. A stop condition can be an expiration of a wait time, occurrence of a specific event, satisfaction of a specific state, or other conditions defined by the application.

5. Use “network communication” to understand the Reactor model

Assume that you are developing a chat application through which users can communicate with other users in real time. To handle concurrent network connections, you can use the Reactor pattern.

  1. First, you create an event loop (Event Loop) as the main loop and initialize a network socket to listen for incoming connection requests.
  2. When there is a new client connection request, the network socket will fire an event to notify your event loop of the connection request.
  3. Next, you create a connection handler (Connection Handler), which is responsible for handling each client connection. Connection handlers encapsulate the logic for handling connections, such as authentication, receiving and sending messages, and so on.
  4. You register the connection handler to the event dispatcher (Event Dispatcher), and establish the mapping relationship between the socket and the connection handler. In this way, when there is a new connection request, the event dispatcher can pass the connection event to the corresponding connection handler.
  5. After the event loop starts, it waits for a connection request to occur. When a client connects, the socket fires a connect event, which notifies the event loop.
  6. After the event loop receives the connection event, it passes the connection event to the event dispatcher.
  7. The event dispatcher selects an appropriate connection handler to handle the connection based on the type of connection event and other rules. It finds the corresponding connection handler based on the client’s identity, request type, etc.
  8. The connection handler performs the corresponding operations, such as authentication, handling the receiving and sending of messages, and so on. These operations are non-blocking, so other client connections can be processed concurrently.
  9. After the processing is completed, the connection handler returns to the event loop and continues to wait for the next connection request.
  10. The event loop continuously monitors connection events, and distributes connection events to corresponding connection handlers for processing. This decoupling design enables you to handle multiple client connections and communications concurrently, improving the responsiveness and scalability of your application.

Summary

  • To sum up, Reactor mode has the characteristics of event-driven, non-blocking, multiplexing, scalability, flexibility and high performance. It can provide an efficient event processing mechanism, enabling applications to process multiple events concurrently, and has good maintainability and scalability.

Three, code implementation

1. Using epoll for multiplexing to realize the operation process of Reactor mode

  1. Initialize the server: Call the init_server function to create a server socket and perform the following operations on the interface:
    • Create a listening socket: Use the socket function to create a listening socket, specify the protocol family, socket type and protocol number. For example, socket(AF_INET, SOCK_STREAM, 0) creates a TCP socket.
    • Bind address and port: Use the bind function to bind the socket to the specified IP address and port number. Need to create a sockaddr_in structure, and set the corresponding address type, IP address and port number. Bind the socket to the desired address by calling the bind function with the socket descriptor and the pointer to the sockaddr_in structure as parameters.
    • Listen for connections: Use the listen function to start listening on a socket. The socket descriptor and the maximum number of waiting connections need to be passed in. This will set the socket into a waiting state and can accept new client connections.
  2. Initialize the event loop mechanism: Create an epoll instance, use the epoll_create function to create an epoll instance. The returned file descriptor can be used for subsequent epoll operations.
  3. Create an event handler and add it to the event loop: Create an EventHandler structure, which contains the file descriptor to be monitored and the corresponding event handler. Use this structure as a parameter, and call the reactor_add_handler function to add the event handler to the event loop.
  4. Start the event loop: Call the reactor_run function to start the event loop. In the event loop, the following actions are performed:
    • Use the epoll_wait function to block and wait for the ready event to occur. This function receives the epoll file descriptor, the array used to get the event and the length of the array.
    • Once there is a ready event, traverse the event array and find the corresponding event handler.
    • Call the processing function of the event handler to handle the event.
  5. Process connection event: When the listening socket has a new connection, the connection event will be triggered. In the connection event processing function, use the accept function to accept the client connection, and create a new socket for communicating with the client. Then, create a new event handler and add the socket to the event loop.
  6. Process read event: When there is data to read, trigger a read event. In the read event processing function, use the read function to read and process the data. If the data is read, corresponding operations can be performed on the data. If the read returns 0, the connection is closed and the socket needs to be removed from the event loop.
  7. Remove the event handler from the event loop: When the connection is closed or an error occurs, the event handler needs to be removed from the event loop. By calling the reactor_remove_handler function, remove the socket from the epoll instance and delete the corresponding event handler in the array.
  8. Cleanup operation: After the event loop ends, close the listening socket and epoll instance, and release any other resources.

2.Reactor mode implementation code (reference)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>

#define MAX_EVENTS 10
#define BUFFER_SIZE 1024

// Define the structure of the event handler
typedef struct {
    int fd; // file descriptor
    void (*handler)(int); // pointer to event handler function
} EventHandler;

// Define the event loop mechanism and the storage of event handlers
int epoll_fd; // file descriptor of epoll instance
EventHandler event_handlers[MAX_EVENTS]; // Array of event handlers
int num_handlers = 0; // number of handlers in event handler array

// Initialize the server socket
int init_server(int port);

// Handle connection events
void handle_accept(int listen_fd);

// handle read event
void handle_read(int client_fd);

// Initialize the event loop mechanism
void reactor_init();

// add event handler to event loop
void reactor_add_handler(int fd, EventHandler event_handler);

// remove the event handler from the event loop
void reactor_remove_handler(int fd);

// start event loop
void reactor_run();

int main(int argc,char *argv[]){

    if(argc < 2) return -1;

    int port = atoi(argv[1]);

    int listen_fd = init_server(port);
    if(listen_fd == -1) return -1;

    // Initialize the event loop mechanism
    reactor_init();

    // Create an event handler and add it to the event loop
    EventHandler event_handler;
    event_handler.fd = listen_fd;
    event_handler.handler = handle_accept;
    reactor_add_handler(listen_fd, event_handler);

    // start event loop
    reactor_run();

    close(listen_fd);

    return 0;
}

// Initialize the server socket
int init_server(int port){
    //Get server fd, usually 3, the front 0, 1, 2 are used to specify input, output, error value
    int listen_fd = socket(AF_INET,SOCK_STREAM,0);

    if(-1 == listen_fd){
        printf("Socket error code: %d codeInfo: %s\
", errno, strerror(errno));
        return -1;
    }

    //Set the server socket to non-blocking mode
    // int flags = fcntl(sfd,F_GETFL,0);
    // fcntl(sfd,F_SETFL,flags | O_NONBLOCK);

    struct sockaddr_in server_addr;
    memset( & server_addr,0,sizeof(struct sockaddr_in));
    server_addr.sin_family = AF_INET; //ipv4
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0.0
    server_addr.sin_port = htons(port);
    
    //Bind IP and port number
    if(-1 == bind(listen_fd,(struct sockaddr*) & server_addr,sizeof(struct sockaddr_in)))
    {
        printf("Bind error code: %d codeInfo: %s\
", errno, strerror(errno));
        return -1;
    }

    //Listen for connections on this socket
    if(-1 == listen(listen_fd,SOMAXCONN))
    {
        printf("Listen error code: %d codeInfo: %s\
", errno, strerror(errno));
        return -1;
    }

    printf("Socket init successful: server fd = %d\
",listen_fd);
    return listen_fd;
}

// Handle connection events
void handle_accept(int listen_fd) {
    struct sockaddr_in client_addr;
    socklen_t addr_len = sizeof(struct sockaddr_in);
    int client_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &addr_len);
    printf("Accepted new connection: client fd = %d\
", client_fd);

    // add read event to event loop
    EventHandler event_handler;
    event_handler.fd = client_fd;
    event_handler.handler = handle_read;
    reactor_add_handler(client_fd, event_handler);
}

// handle read event
void handle_read(int client_fd) {
    char buffer[BUFFER_SIZE] = {0};
    ssize_t bytes_read = read(client_fd, buffer, BUFFER_SIZE);
    if (bytes_read > 0) {
        printf("Received client fd=%d DataLen: %d Data: %s\
", client_fd, (int)bytes_read, buffer);
    } else if (bytes_read == 0) {
        printf("Connection closed: client fd = %d\
", client_fd);
        // close the connection and remove from the event loop
        reactor_remove_handler(client_fd);
        close(client_fd);
    } else {
        perror("Read error");
        // close the connection and remove from the event loop
        reactor_remove_handler(client_fd);
        close(client_fd);
    }
}

// Initialize the event loop mechanism
void reactor_init() {
    epoll_fd = epoll_create(1);
    if (epoll_fd < 0) {
        perror("Epoll creation failed");
        exit(1);
    }
    printf("Create epoll successful: epoll fd = %d\
",epoll_fd);
}

// add event handler to event loop
void reactor_add_handler(int fd, EventHandler event_handler) {
    struct epoll_event event;
    event.events = EPOLLIN; // Only listen to read events
    event.data.fd = fd;
    // Add the file descriptor to the epoll instance for event monitoring
    int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
    if (ret < 0) {
        perror("Epoll control failed");
        exit(1);
    }
    // Add event handlers to the array
    event_handlers[num_handlers++] = event_handler;
}

// remove the event handler from the event loop
void reactor_remove_handler(int fd) {
    // Remove the file descriptor from the epoll instance
    int ret = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
    if (ret < 0) {
        perror("Epoll control failed");
        exit(1);
    }
    // remove the event handler from the array
    for (int i = 0; i < num_handlers; i ++ ) {
        if (event_handlers[i].fd == fd) {
            event_handlers[i] = event_handlers[--num_handlers];
            break;
        }
    }
}

// start event loop
void reactor_run() {
    struct epoll_event events[MAX_EVENTS];
    while (1) {
        // wait for the event to occur
        int num_ready = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (num_ready < 0) {
            perror("Epoll wait failed");
            break;
        }

        // Traverse ready events and call corresponding event handlers
        for (int i = 0; i < num_ready; i ++ ) {
            int fd = events[i].data.fd;
            // Find the corresponding event handler and call its handler function
            for (int j = 0; j < num_handlers; j++ ) {
                if (event_handlers[j].fd == fd) {
                    event_handlers[j].handler(fd);
                    break;
                }
            }
        }
    }
    close(epoll_fd); //Cleanup operation
}