Reactor principle and implementation

1. reactor model

What is a reactor?
reactor refers to a high-performance processing mechanism under the select, poll, and epoll network models in multiple I/O multiplexing.
reactor Interpretation of “reactor”, is an event-driven mechanism.
The calling mechanism of ordinary functions:
The program calls a function, the function executes, the program waits, the function returns the result and control to the program, and the program continues processing.
The calling mechanism of reactor:
The application does not actively call an API to complete the processing. On the contrary, the reactor reverses the event processing process. The application needs to provide the corresponding interface and register it with the reactor.
If the corresponding time occurs, reactor will actively call the interface registered by the application, and these interfaces are also called “callback functions”.

2. reactor application scenario

The reactor mode is a common mode for processing concurrent I/O. It is used for synchronous I/O. The central idea is to register all I/O events to be processed to a central I/O multiplexer, and at the same time the main The thread/process is blocked on the multiplexer; once an I/O event arrives or is ready (file descriptor or socket can be read and written), the multiplexer returns and the corresponding I/O registered in advance Events are dispatched to corresponding handlers.

The reactor model has three important components:
? Multiplexer: Provided by the operating system, generally select, poll, epoll and other system calls on linux.
? Event dispatcher: assign the ready event returned by the multiplexer to the corresponding processing function.
? Event handler: a handler function responsible for handling a specific event.

The specific process is as follows:

  1. Register read-ready events and corresponding event handlers;
  2. The event separator waits for events;
  3. When the event arrives, the separator is activated, and the separator calls the handler corresponding to the event;
  4. The event handler completes the actual read operation, processes the read data, registers for new events, and returns control.

3. Specific code implementation

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>

#define SERVER_PORT 6668
#define MAX_EPOLL_EVENTS 1024
#define BUFFER_LENGTH 4096

typedef int (*NCALLBACK)(int, int , void *);

typedef struct ntyEvent{<!-- -->
    int fd;
    int events;
    void *arg;

    int (*callback)(int fd, int events, void *arg);
    int status; //status is 0, indicating that the node is not on the red-black tree
    int length;
    char buffer[BUFFER_LENGTH];
} ntyEvent_t;

typedef struct ntyReactor{<!-- -->
    int epfd;
    struct ntyEvent *events;
}ntyReactor_t;

int recv_cb_hdl(int fd, int events, void *arg);
int send_cb_hdl(int fd, int events, void *arg);
int accept_cb_hdl(int listen_fd, int events, void *arg);


int socket_init(int port){<!-- -->
    int ret = 0;
    int reuse;
    struct sockaddr_in serv;
    socklen_t address_len = sizeof(struct sockaddr_in);
    
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if(listen_fd < 0){<!-- -->
        printf("socket():%s\\
", strerror(errno));
        return -1;
    }

    ret = fcntl(listen_fd, F_SETFL, O_NONBLOCK);
    if(ret < 0){<!-- -->
        printf("fcntl() error: %s\\
", strerror(errno));
        return -1;
    }

    reuse = 1;
    ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
    if(ret < 0){<!-- -->
        printf("setsockopt() error: %s\\
", strerror(errno));
        return -1;
    }

    memset( &serv, 0, sizeof(struct sockaddr_in));
    serv.sin_family = AF_INET;
    serv.sin_port = htons(port);
    serv.sin_addr.s_addr = htonl(INADDR_ANY);

    ret = bind(listen_fd, (struct sockaddr*) & amp;serv, address_len);
    if(ret < 0){<!-- -->
        printf("bind() error: %s\\
", strerror(errno));
        return -1;
    }

    ret = listen(listen_fd, 128);
    if(ret < 0){<!-- -->
        printf("listen() error: %s\\
", strerror(errno));
        return -1;
    }
    
    return listen_fd;
}

int ntyReactor_init(ntyReactor_t *reactor){<!-- -->
    if(reactor == NULL ) return -1;

    reactor->epfd = epoll_create(1);
    if(reactor->epfd < 0){<!-- -->
        printf("ntyReactor_init() error:%s\\
", strerror(errno));
        return -1;
    }

    reactor->events = (ntyEvent_t *)malloc(sizeof(ntyEvent_t) * MAX_EPOLL_EVENTS);
    if(NULL == reactor->events){<!-- -->
        printf("malloc() error!\\
");
        close(reactor->epfd);
        return -1;
    }
    memset(reactor->events, 0, sizeof(ntyEvent_t)*MAX_EPOLL_EVENTS);

    return 0;
}

int ntyEventSet(ntyEvent_t *ev, int fd, NCALLBACK callback, void * arg){<!-- -->
    if(ev == NULL) return -1;
    ev->fd = fd;
    ev->events = 0; //set to 0 by default
    ev->arg = arg;
    ev->callback = callback;
    return 0;
}

int ntyEventAdd(int epfd, int events, ntyEvent_t *ev){<!-- -->
    struct epoll_event ep_ev = {<!-- -->0, {<!-- -->0}};
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;

    int op;
    int ret = 0;
    if(ev->status == 1){<!-- -->
        op = EPOLL_CTL_MOD;
    }else{<!-- -->
op = EPOLL_CTL_ADD;
ev->status = 1;
    }

    ret = epoll_ctl(epfd, op, ev->fd, &ep_ev);
    if(ret < 0){<!-- -->
        printf("epoll_ctl():%s\\
", strerror(errno));
        return -1;
    }
    
    return 0;
}

int ntyReactorAddListenFd(ntyReactor_t *reactor, int listen_fd, NCALLBACK acceptor){<!-- -->
    if(reactor == NULL) return -1;
    if(reactor->events == NULL) return -1;
    int ret = 0;

    ret = ntyEventSet( &reactor->events[listen_fd], listen_fd, acceptor, (void *)reactor);
    if(ret < 0){<!-- -->
        printf("ntyEventSet() error!\\
");
        return -1;
    }

    ret = ntyEventAdd(reactor->epfd, EPOLLIN, &reactor->events[listen_fd]);
    if(ret < 0){<!-- -->
        printf("nty_event_add() error!\\
");
        return -1;
    }
    
    return 0;
}

int ntyEventDel(int epfd, ntyEvent_t *ev){<!-- -->
    struct epoll_event ep_ev = {<!-- -->0, {<!-- -->0}};

    //For a node added to the red-black tree, the value of status must be 1. If status != 1, it means that it is not on the tree and does not need to be cleared
    if (ev->status != 1) {<!-- -->
return -1;
}

    ep_ev.data.ptr = ev;
ev->status = 0; //reset status
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    return 0;
}

// lowercase to uppercase function
void buffer_hdl(char *buffer, int length){<!-- -->
    int i = 0;
    for(i = 0; i < length; i ++ ){<!-- -->
        buffer[i] = toupper(buffer[i]);
    }
}

int send_cb_hdl(int fd, int events, void *arg){<!-- -->
    ntyReactor_t *reactor = (ntyReactor_t *)arg;
    ntyEvent_t *ev = &reactor->events[fd];

    //Simulate business processing
    buffer_hdl(ev->buffer, ev->length);
    
int len = send(fd, ev->buffer, ev->length, 0);
if (len > 0) {<!-- -->
printf("sendbuff = [fd = %d]:%s\\
", fd, ev->buffer);
ntyEventSet(ev, fd, recv_cb_hdl, reactor);
ntyEventAdd(reactor->epfd, EPOLLIN, ev);
}else if(len == 0){<!-- -->
        ntyEventDel(reactor->epfd, ev);
        close(ev->fd);
printf("---send_cb_hdl--client[%d] disconnected!\\
", fd);
    } else {<!-- -->
        ntyEventDel(reactor->epfd, ev);
close(ev->fd);
printf("--send_cb_hdl--send[fd=%d] error %s\\
", fd, strerror(errno));
}

    return 0;
}


int recv_cb_hdl(int fd, int events, void *arg){<!-- -->
    ntyReactor_t *reactor = (ntyReactor_t *)arg;
    ntyEvent_t *ev = &reactor->events[fd];

    int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
    if(len > 0){<!-- -->
        ev->length = len;
ev->buffer[len] = '\0';
    
printf("recvbuff = [fd = %d]:%s\\
", fd, ev->buffer);
        
ntyEventSet(ev, fd, send_cb_hdl, reactor);
ntyEventAdd(reactor->epfd, EPOLLOUT, ev);
    }else if(len < 0){<!-- -->
        ntyEventDel(reactor->epfd, ev);
close(ev->fd);
printf("--recv_cb_hdl--recv[fd=%d] error[%d]:%s\\
", fd, errno, strerror(errno));
    }else{<!-- -->
        ntyEventDel(reactor->epfd, ev);
close(ev->fd);
printf("--recv_cb_hdl--client[%d] disconnected!\\
", fd);
    }

    return 0;
}

int accept_cb_hdl(int listen_fd, int events, void *arg){<!-- -->
    ntyReactor_t *reactor = (ntyReactor_t *)arg;
    if (reactor == NULL) return -1;

    struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd;
    int i = 0;

    client_fd = accept(listen_fd, (struct sockaddr*) & amp;client_addr, & amp;len);
    if(client_fd < 0)
    {<!-- -->
if (errno != EAGAIN & amp; & amp; errno != EINTR) {<!-- -->
            printf("accept: %s\\
", strerror(errno));
            return -1;
}
}

    do{<!-- -->
        // Standard input (0), standard output (1), standard error (2), listen_fd (3), reactor->epfd (4) occupy the first few file descriptors
        for (i = reactor->epfd + 1; i < MAX_EPOLL_EVENTS; i ++ ) {<!-- -->
if (reactor->events[i].status == 0) {<!-- -->
break;
}
}

        //Used to determine whether the monitored connection exceeds the upper limit, if the i from the above break is equal to the maximum number of monitored, no new connection event processing can be added, and subsequent optimization can be dynamically expanded
if (i == MAX_EPOLL_EVENTS) {<!-- -->
printf("%s: max connect limit[%d]\\
", __func__, MAX_EPOLL_EVENTS);
break;
}

        int flag = 0;
if ((flag = fcntl(client_fd, F_SETFL, O_NONBLOCK)) < 0) {<!-- -->
printf("%s: fcntl nonblocking failed, %d\\
", __func__, MAX_EPOLL_EVENTS);
break;
}
        
        ntyEventSet( &reactor->events[client_fd], client_fd, recv_cb_hdl, reactor);
        ntyEventAdd(reactor->epfd, EPOLLIN, &reactor->events[client_fd]);
    }while(0);

    printf("new connect [%s:%d], clientfd[%d], cur_unused_index = [%d]\\
",\
            inet_ntoa(client_addr. sin_addr), ntohs(client_addr. sin_port), client_fd, i);

    return 0;
}

int ntyReactorLoopRun(ntyReactor_t *reactor){<!-- -->
    if(reactor == NULL) return -1;
    if(reactor->epfd < 0) return -1;
    if(reactor->events == NULL) return -1;

    struct epoll_event events[MAX_EPOLL_EVENTS];
    int nready = 0;
    int i = 0;
    
    while(1){<!-- -->
        nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, -1);
        if(nready < 0){<!-- -->
            if(errno == EINTR){<!-- -->
                //Interrupted by the signal, re-loop
                continue;
            }
            printf("epoll_wait error(%s), exit---\\
", strerror(errno));
            return -1;
        }

        for(i = 0; i < nready; i ++ ){<!-- -->
            ntyEvent_t *ev = (ntyEvent_t*)events[i].data.ptr;
            if((ev->events & amp; EPOLLIN) & amp; & amp; events[i].events & amp; EPOLLIN){<!-- -->
                ev->callback(ev->fd, events[i].events, ev->arg);
            }

            if((ev->events & amp; EPOLLOUT) & amp; & amp; events[i].events & amp; EPOLLOUT){<!-- -->
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
        }
    }

    return 0;
}

int ntyReactorDestory(ntyReactor_t *reactor){<!-- -->
    if(NULL == reactor) return -1;
    
    close(reactor->epfd);
    if(reactor->events != NULL)
free(reactor->events);
    free(reactor);
    return 0;
}

int main(int argc, char *argv[]){<!-- -->
    int ret = 0;
    int serv_port = SERVER_PORT;
    
    if(argc == 2){<!-- -->
        serv_port = atoi(argv[1]);
    }

    //1. Initialize socket()
    int listen_fd = socket_init(serv_port);
    if(listen_fd < 0){<!-- -->
        printf("init_socket() error!\\
");
        return -1;
    }

    printf("socket_init: listen_fd = [%d]\\
", listen_fd);
    //2, initial reactor
    ntyReactor_t *reactor = (ntyReactor_t *)malloc(sizeof(ntyReactor_t));
    if(NULL == reactor){<!-- -->
        printf("malloc() error!\\
");
    }
    ret = ntyReactor_init(reactor);
    if(ret < 0){<!-- -->
        printf("ntyReactor_init() error!\\
");
    }
    printf("ntyReactor_init:reactor->epfd = [%d]\\
",reactor->epfd);

    //3. The listen_fd event processing callback function is registered to the reactor listener
ret = ntyReactorAddListenFd(reactor, listen_fd, accept_cb_hdl);
    if(ret < 0){<!-- -->
        printf("ntyreactor_addlistener() error\\
");
        return -1;
    }
    
    //4. The epoll_wait() function under the recator model waits for events to occur
    ret = ntyReactorLoopRun(reactor);
    if(ret < 0){<!-- -->
        printf("ntyreactor_run() error!\\
");
    }
    
    //5, reactor destruction
    ret = ntyReactorDestory(reactor);
    if(ret < 0){<!-- -->
        printf("ntyReactorDestory() error!\\
");
    }
close(listen_fd);
    return 0;
}
Compile, execute and debug:
[root@localhost socket-reactor-mode]# ./test 8888
socket_init:listen_fd=[3]
ntyReactor_init:reactor->epfd = [4]
new connect [127.0.0.1:36798], clientfd[5], cur_unused_index = [5]
recvbuff = [fd = 5]: hello linux!
sendbuff = [fd = 5]:HELLO LINUX!
new connect [127.0.0.1:36800], clientfd[6], cur_unused_index = [6]
recvbuff = [fd = 6]: hello China!
sendbuff = [fd = 6]:HELLO CHINA!

new connect [127.0.0.1:36802], clientfd[7], cur_unused_index = [7]
recvbuff = [fd = 7]:abcdefghigkmon
sendbuff = [fd = 7]:ABCDEFGHIGKMON

new connect [172.16.10.223:58039], clientfd[8], cur_unused_index = [8]
recvbuff = [fd = 8]: http://www.cmsoft.cn QQ: 10865600
sendbuff = [fd = 8]: HTTP://WWW.CMSOFT.CN QQ: 10865600
--recv_cb_hdl--client[6] disconnected!
new connect [127.0.0.1:36804], clientfd[6], cur_unused_index = [6]
recvbuff = [fd = 6]: hello china!
sendbuff = [fd = 6]:HELLO CHINA!

4. Advantages of reactor mode

The reactor mode is one of the necessary technologies for writing high-performance web servers, and has the following advantages:
? The response is fast and does not need to be blocked by a single synchronization time, although Reactor itself is still synchronous;
? Programming is relatively simple, which can avoid complex multi-threading and synchronization problems to the greatest extent, and avoid multi-threading/processing
Program switching overhead;
? Scalability, it is convenient to make full use of CPU resources by increasing the number of Reactor instances;
? Reusability, the reactor framework itself has nothing to do with specific event processing logic, and has high reusability;

5. Summary

The development efficiency of the reactor model is higher than that of directly using IO multiplexing. It is usually single-threaded. The design goal is to hope that a single thread can use all the resources of a CPU, but it also has the added advantage that each event can be processed many times. Mutually exclusive access to shared resources is not considered. However, the shortcomings are also obvious. The current hardware development no longer follows Moore’s Law. The frequency of the CPU is limited by the material and no longer has a big increase. Instead, the increase in the number of cores improves the ability. When the program needs to use When multi-core resources are used, the reactor model will be tragic, why? If the program business is very simple, such as simply accessing some services that provide concurrent access, you can directly open multiple reactors, each corresponding to a CPU core, and the requests running on these reactors are not related to each other, which is completely possible Take advantage of multi-core.