Talk about the network reactor mode and millions of concurrency in a simple way

This blog post talks about the reactor network processing model around the millions of concurrent server network connections, and will answer the following questions: 1. What is the reactor mode, 2. Why is there a reactor mode, 3. The bottleneck of network concurrency Where, 4. Take the http connection as an example to explain how to realize the encapsulation of epoll reactor.

The network model of almost all server middleware is the reactor model encapsulated by epoll, such as the network layer of redis, nginx, and memcached. So what is the reactor mode?

1. What is reactor mode:

First, let’s recall the mechanism of ordinary function calls: the program calls a function, the function executes, the program waits, the function returns the result and control to the program, and the program continues processing. Reactor interprets “reactor” and is an event-driven mechanism. The difference between the reactor mode and ordinary function calls is that the application program does not actively call an API to complete the processing, but on the contrary, the Reactor reverses the event processing process, and the application program needs to provide the corresponding interface and register with the Reactor. If the corresponding time occurs, Reactor will actively call the interface registered by the application, and these interfaces are also called “callback functions”. This is more abstract. In simple terms, the reactor mode is to automatically call the corresponding function when there is a network connection, that is, the network event will correspond to a business layer interface, so that one event can be processed one by one. For specific understanding, you have to look at the following code. I’ll move on to the benefits of the reactor pattern.

2. The benefits of the reactor model

First of all, it needs to be remembered that the reactor mode is non-blocking io + epoll, because one epoll has to handle multiple io connections. If the horizontal trigger mode of epoll is used, when the read() function is called, if the data is empty, then will be blocked. In addition, a reactor corresponds to an epoll event, and a thread is an epoll. Therefore, a reactor corresponds to a thread, so there will be no multi-thread competition.

Next, I will explain an epoll-based reactor network processing model I wrote:

1.) Header and preprocessor definitions:

#include <unistd.h>

#include <sys/types.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#include <fcntl.h>
#include <string.h>
#include <errno.h> //If you don't understand the header file, use the man command or info command directly under linux

#define BUFFER_LENGTH 1024
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 9999 //As the name suggests
#define PORT_COUNT 20

typedef int CALLBACK(int, int, void*); //callback function pointer

2.) Structure definition:

//The event corresponding to the reactor is actually the connection event
typedef struct reactor_events_{
    int fd;
    int event_type;//event type of epoll

    void* arg;
    (void*)callback(int, int, void*);

    char rbuffer[BUFFER_LENGTH];
    char wbuffer[BUFFER_LENGTH];
    int rlength;
    int wlength;
    
    int status; //judgment event type
    int methods;
//Based on the business logic, other methods can be added here

}reactor_events;


//Storage of connection events, using event block storage, dynamic allocation
typedef struct event_block_{
    struct reactor_events* event; //Pointer to event structure
    struct event_block* next; //Pointer to the next block
}event_block;

//reactor, a reactor, an epoll, a thread
typedef struct C_Reactor_{
    int epfd; //One reactor corresponds to one epoll
    struct event_block* evblock; //Pointer to event_block
    int evblockidx; //The number of event_blocks in the current reactor is actually the index of the last block
}C_Reactor;

3.) Important functions:

(1) Setting, adding, and deleting of epoll event blocks (setting, adding, and deleting of network connections)

int C_event_set(struct reactor_events* ev, int fd, CALLBACK callback, void* arg){
    ev->fd=fd;
    ev->events=0; //No epoll event occurs

    ev->callback=callback;
    ev->arg=arg;

    return 0;
}

int C_event_add(struct reactor_events* ev, int epfd, int events){
    struct epoll_event ep_ev={0,{0}}; //Indicates a ready event in epoll, event type and epoll_data
    ep_ev.data.ptr=ev,
    ep_ev.events=ev->events=events;
    
    int op;
if (ev->status == 1) {
op = EPOLL_CTL_MOD;
} else {
op = EPOLL_CTL_ADD;
ev->status = 1;
}

    if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) return -1;
    
    return 0;
}

int C_event_delete(struct reactor_events* ev, int epfd){
    struct epoll_event ep_ev = {0, {0}};

if (ev->status != 1) {
return -1;
}

ep_ev.data.ptr = ev;
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);

return 0;
}

Readers may be confused about the code:

 struct epoll_event ep_ev={0,{0}};
    ep_ev.data.ptr=ev,
    ep_ev.events=ev->events=events

Then, you can go to my other blog post “In-depth analysis of linux network io multiplexing” to explain the principle of epoll. In fact, the epoll event block is stored by a red-black tree, which contains a union and a structure: the specific code is as follows

typedef union epoll_data {
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event {
  uint32_t events; // bit flag indicating event type (bit flag)
  epoll_data_t data; // data related to the event
};

When we assign the structure pointer reactor_events*ev to ep_ev.data.ptr, the union of ev and epoll_data has established a corresponding relationship, that is, an epoll event block has been established. (The union can only assign a value to one of its members when it is initialized, and the values of other members will be overwritten).

So, here, let’s take a look at the operation mode of this reactor mode, and then I will explain the function implementation inside

int main(int argc,char**agv){
      
    struct C_reactor *reactor = (struct C_reactor*)malloc(sizeof(struct C_reactor)); //Create a reactor structure
C_reactor_init(reactor); //initialization

unsigned short port = SERVER_PORT;
if (argc == 2) {
port = atoi(argv[1]);
}

int i = 0;
int sockfds[PORT_COUNT] = {0};
\t
for (i = 0; i < PORT_COUNT; i ++ ) {
sockfds[i] = init_sock(port + i); //Increase port number
C_reactor_addlistener(reactor, sockfds[i], accept_cb);
}


C_reactor_run(reactor);

C_reactor_destory(reactor);
\t
for (i = 0; i < PORT_COUNT; i ++ ) {
close(sockfds[i]);
}
free(reactor);
\t
return 0;
}

Let’s look at C_reactor_init(reactor) first

int C_reactor_init(struct C_Reactor*reactor){
    reactor=(struct C_Reactor*)malloc(sizeof(struct CReactor));
    memset(reactor,0,sizeof(struct CReactor));
    
    reactor->epfd=epoll_create(1); //create epoll
    if(reactor->epfd<=0){
        return -1;
    }

    struct reactor_events* ev=(struct reactor_events*)malloc((MAX_EPOLL_EVENTS)*sizeof(struct reactor_events));
    memset(ev,0,((MAX_EPOLL_EVENTS)*sizeof(struct reactor_events)));
    if(ev==nullptr){
        free(reactor);
        close(reactor->epfd);
        return -2;
    }

    struct event_block* block=(struct event_block*)malloc(sizeof(struct event_block));
    memset(block,0,sizeof(struct event_block));
    if(block==nullptr){
        free(reactor);
        free(ev);
        close(reactor->epfd);
        return -3;
    }

    reactor->evblock=block;
    reactor->evblockidx=1;

    reactor->evblock->events=ev;
    reactor->evblock->next=nullptr;

    return 0;
}

event_block is allocated dynamically, and an event_block block is allocated, so that there are MAX_EPOLL_EVENTS epoll event blocks in such a block, and the number of events_block is increased by 1:

int C_reactor_alloc(struct C_reactor *reactor) {

if (reactor == NULL) return -1;
if (reactor->evblock == NULL) return -1;
\t
struct event_block* blk = reactor->evblock;

while (blk->next != NULL) {
blk = blk->next;
}

struct reactor_event* evs = (struct reactor_event*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct reactor_event));
if (evs == NULL) return -2;

memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct reactor_event));

struct event_block *block = malloc(sizeof(struct event_block));
if (block == NULL) return -3;
\t
block->events = evs;
block->next = NULL;

blk->next = block;
reactor->evblockid++;

return 0;
}

In network programming, a five-tuple (Five-Tuple) refers to five elements used to uniquely identify a data stream in network communication. These five elements include:

  1. Source IP Address: The IP address of the source device that sent the data packet.

  2. Destination IP Address: The IP address of the destination device that receives the data packet.

  3. Source Port Number: The port number used by the source device sending the data packet.

  4. Destination Port Number: The port number used by the destination device that receives the packet.

  5. Transport Layer Protocol (Transport Layer Protocol): TCP, UDP and other protocols used to transmit data.

int init_sock(short port) {

int listenfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);

struct sockaddr_in server_addr;
memset( & server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);

bind(fd, (struct sockaddr*) & server_addr, sizeof(server_addr));

if (listen(fd, 20) < 0) {
printf("listen failed : %s\\
", strerror(errno));
return -1;
}

printf("listen server port : %d\\
", port);

return listenfd;
}
 
//event_block where sockfd is located, create event event block
struct reactor_event *C_reactor_idx(struct C_reactor *reactor, int sockfd) {

if (reactor == NULL) return NULL;
if (reactor->evblock== NULL) return NULL;

int blkidx = sockfd / MAX_EPOLL_EVENTS;
while (blkidx >= reactor->evblockidx) {
C_reactor_alloc(reactor);
}

int i = 0;
struct event_block* blk = reactor->evblock;
while (i + + != blkidx & amp; & amp; blk != NULL) {
blk = blk->next;
}

return & blk->events[sockfd % MAX_EPOLL_EVENTS];
}

Then, add the listenfd we set to the epoll event block:

int C_reactor_addlistener(struct C_reactor *reactor, int sockfd, NCALLBACK *acceptor) {

if (reactor == NULL) return -1;
if (reactor->evblock== NULL) return -1;

struct reactor_events *event = C_reactor_idx(reactor, sockfd);
if (event == NULL) return -1;
    
C_event_set(event, sockfd, acceptor, reactor);
C_event_add(reactor->epfd, EPOLLIN, event);

return 0;
}

Start the reactor to monitor the io connection. In fact, it is to let epoll_wait() to poll continuously. When the state of the epoll event block changes, it will return the corresponding fd

int C_reactor_run(struct C_reactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->evblock == NULL) return -1;
\t
struct epoll_event events[MAX_EPOLL_EVENTS + 1];
\t
int checkpos = 0;

while (1) {

int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
if (nready < 0) {
printf("epoll_wait error, exit\\
");
continue;
}

for (int i = 0; i < nready; i ++ ) {

struct reator_event *ev = (struct reator_event*)events[i].data.ptr;
        
if ((events[i].events & amp; EPOLLIN) & amp; & amp; (ev->events & amp; EPOLLIN)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & amp; EPOLLOUT) & amp; & amp; (ev->events & amp; EPOLLOUT)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
\t\t\t
}
    }
}

The last is to release the resources that have been applied for:

}

int C_reactor_destory(struct C_reactor *reactor) {

close(reactor->epfd);

struct eventblock *blk = reactor->ev_block;
struct eventblock *blk_next;
while (blk != NULL) {
blk_next = blk->next;

free(blk->events);
free(blk);
\t\t
blk = blk_next;
}

return 0;
}