Reactor mode network library encapsulation based on IO reuse (epoll)

Article summary:

1. Understand the main goals and ideas of network library encapsulation.

2. Understand the Reactor pattern.

3. Code implementation.

1. Network library encapsulation

1.1 What is the purpose of the network library?

The encapsulated network library acts between the application layer and transport layer that process services. Programmers can know where the information received from the xx client is stored through the provisions of the network library, and use the API provided by the library to obtain the information. After completing the business processing, the information will be transmitted to the next place to be transmitted (passed back to the client). terminal, or fill in a database such as mysql). Therefore, the network library should focus on the following two objects and four operations:

Connection: established, disconnected

Information: receiving (reading), transmitting (writing)

By establishing a connection, receiving information, writing information, and disconnecting information, we can completely implement communication with another server. At this time, the business layer only needs to access the location where the information is stored to obtain the information and perform business processing. Then the processed information is filled into the cache at the specified location, and the library can return the data. In this way, the abstract separation of the network layer and the business layer can be achieved, and the goal of building a network library can be achieved.

1.2 Basic construction ideas of network library

For the above four operations, the following corresponding io functions are given in socket programming:

1. Connection establishment:

socket() is used to create a fixed-purpose socket, and the return value is a handle;
bind() is used to bind the specified socket and a fixed address;
listen() is used to enable the specified socket to listen for events on a fixed port;

accept() is used to take out a connection from the full connection queue to establish and return the corresponding handle.

connect() is used to send connection requests as a client

Whenever there is a connection request on the listening port, the listening socket will be in a readable state.

2. Disconnection

The server actively disconnects:

close(int fd);

shutdown();

Shutdown can specify to close the semi-connection

The server passively disconnects (that is, the client actively disconnects):

When the client actively disconnects, an EOF character is written. At this time, the server reads, and recv will return a 0. Just close the socket at this point

3. Receiving and sending information

ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t read(int fd, void *buf, size_t count);

ssize_t send(int sockfd, const void *buf, size_t len, int flags);
ssize_t write(int fd, const void *buf, size_t count);

Among them, recv and send are equivalent to read and write as long as the last parameter is set to 0.

Using these io functions, we can conceive of a simple pseudocode for the main loop of the network library:

while(1){

for(all sockets){

Check socket io status;

if(socket readable){

Perform specified reading and storage.

}

if (socket is writable){

Perform specified writing and sending.

}

}

}

The above is a scenario using non-blocking io. If blocking io is used, we have to open a thread for each client io. Whether it is the full traversal of a for loop or the opening of a thread, it is very performance consuming. Once the number of client connections reaches hundreds of millions, the performance of the server will become extremely poor. How to optimize it?

We noticed that the main loop has two responsibilities: 1. Detect io status 2. Perform io operations

We mainly focus on io state optimization. Obviously, we can use io multiplexing to assume the responsibility of detecting io status. Since the bottom layer of select and poll is still traversed by a for loop, we choose the more powerful epoll. Set the timeout of epoll to -1, and epoll will block and wait for changes in events before bringing them back, eliminating the need to traverse events.

For a specific introduction to io reuse, you can refer to my last blog: TCP/IP network programming: basic construction of TCP server and optimization of construction using io reuse_Violet Evergarden.’s blog-CSDN blog

The epoll instance inside this blog post builds a simple server, and simply sets the business layer processing to direct print output. That is, epoll is used to manage the status of the detected io.

2. Reactor mode

So why do we introduce the reactor mode? Paying attention to the epoll example in the above blog post, we use the same buffer cache area to store data for all client connections. We can’t tell where the data comes from. At the same time, we wrote the business operations directly in the loop, which is very difficult to modify the business. Therefore, on the basis that io reuse has achieved efficient detection of io status, we introduce the Reactor mode to transform the management of io into the management of events. Provide callback functions to efficiently handle specific IO operations. Moreover, it provides a complete data structure, opens up a space for each connection, and stores rbuffer and wbuffer.

So what is Reactor? The Reactor design pattern is an event handling pattern for handling service requests delivered simultaneously to a service handler via one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handler.

When creating a Reactor, epoll_create is called to create an epoll. Each time epoll_wait brings back all corresponding events back to the Reactor center, and the center assigns handlers to handle the events. If it is a connection request, the accept_callback callback function will be triggered to establish a new clientfd connection and register a new event in epoll. For other events, the handler will determine the triggered callback function based on the event type.

The data structure of the simple reactor implemented in this article is as follows:

Each clientfd has its own connet_t node, which stores the read and write cache data corresponding to this client. Each consecutive 1024 connet_t constitutes a block, and its first address pointer is stored in a block node. Blocks are stored in linked lists. The function zya_connet_t* zya_reactor_local_connet (zya_reactor_t* reactor, int fd) is provided to locate the block sequence number and the sequence number within the block based on the value of fd. If there are not enough blocks, add a block. In this way, as long as the business layer accesses the specified connet_t structure in the fixed block, it can obtain the data, complete the business operation, and return the data to be sent, realizing the separation of the network layer and the business layer.

The following is the specific code implementation. This code adds http request acceptance and http sending to the recv and send functions to implement a simple http server, but there is no specific http message analysis process, and the messages sent are fixed. Just learn the reactor architecture. In fact, the construction of Reactor is very similar to the epoll construction TCP server in the previous blog.

//Reactor event-driven management implementation based on epoll

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sendfile.h>
#include <fcntl.h>


#define LISTEN_NUM 100
#define CONNETCT_SIZE_PER_BLOCK 1024
#defineBUFFER_SIZE 1024
#define HTTP_WEB_ROOT "/mnt/hgfs/mylinux/linux/2-2-1-http"
#define KEY_SIZE 512
#define VALUE_SIZE 512
#define KV_TABLE_SIZE 50

typedef int (*ZYA_CALLBACK_FUNC)(int fd,int events,void* arg);

//key-value structure definition
typedef struct zya_kv_pair_s{
    char key[KEY_SIZE];
    char value[VALUE_SIZE];
}kv_pair_t;

typedef struct zya_kv_repository{
    kv_pair_t* table;
    int cur_kv_num;
}kv_repository_t;

//reactor structure definition
typedef struct zya_connect_s{
    int fd;

    ZYA_CALLBACK_FUNC callback;

    char rbuffer[BUFFER_SIZE];
    int rcount;
    char wbuffer[BUFFER_SIZE];
    int wcount;

    char resource[BUFFER_SIZE];
int enable_sendfile;

    int count;

    kv_repository_t* kv;

}zya_connet_t;

typedef struct zya_block_s{

    zya_connet_t* connetshead;
    struct zya_block_s* nextblock;
    
}zya_block_t;

typedef struct zya_reactor_s{

    zya_block_t* blockhead;
    zya_block_t* blocktail;
    int blockcount;
    int epfd;
}zya_reactor_t;

//kv structure interface
int kv_init(kv_repository_t* kv);
int kv_destroy(kv_repository_t* kv);
int kv_push(kv_repository_t* kv,char* key,char* value);
int kv_zero(kv_repository_t* kv);
char* kv_getpair(kv_repository_t* kv,char* key);

//reactor maintenance function
int zya_reactor_init(zya_reactor_t*);
int zya_reactor_destroy(zya_reactor_t*);
zya_connet_t* zya_reactor_local_connet(zya_reactor_t* reactor,int fd);
int zya_reactor_add_block(zya_reactor_t* reactor);
int zya_reactor_add_events(zya_reactor_t* reactor,int fd,int event,ZYA_CALLBACK_FUNC func);

//Callback
int recv_callback(int fd,int events,void* arg);
int send_callback(int fd,int events,void* arg);
int accept_callback(int fd,int events,void* arg);

//Initialize socket function
int socket_listen_init(short port);

//http request and its processing
int readline(char* allbuf,int idx,char* rtnbuf);
int zya_http_request(zya_connet_t*);
int zya_http_response(zya_connet_t*);

int main(int argc,char* argv[]){

    //Process console parameters and get the specified port number
    if(argc<2){
        return -1;
    }
    short port=atoi(argv[1]);

    


    //Initialize reactor
    zya_reactor_t* myreactor=(zya_reactor_t*)malloc(sizeof(zya_reactor_t));
    int t=zya_reactor_init(myreactor);

    for(int i=0;i<1;i + + ){
        //Set the socket for listening
        int sockfd=socket_listen_init(port + i);
        if(sockfd<0){
            return -1;
        }

        //Add sockfd to it
        t=zya_reactor_add_events(myreactor,sockfd,EPOLLIN | EPOLLET,accept_callback);
    }

    struct epoll_event events[102400]={0};

    while(1){

        int nready=epoll_wait(myreactor->epfd,events,1024,-1);


        for(int i=0;i<nready;i + + ){
            int curfd=events[i].data.fd;
            
            if(events[i].events == EPOLLIN){
                zya_connet_t* connet=zya_reactor_local_connet(myreactor,curfd);
                connet->callback(curfd,EPOLLIN,myreactor);
            }

            if(events[i].events == EPOLLOUT){
                zya_connet_t* connet=zya_reactor_local_connet(myreactor,curfd);
                connet->callback(curfd,EPOLLIN,myreactor);
            }
        }

    }

    zya_reactor_destroy(myreactor);
    printf("delete sucess!\
");
    return 0;
}

//kv group function implementation
int kv_init(kv_repository_t* kv){
    if(!kv){
        return -1;
    }

    memset(kv,0,sizeof(kv_repository_t));
    
    kv->cur_kv_num=0;
    kv->table=(kv_pair_t*)malloc(sizeof(kv_pair_t)*KV_TABLE_SIZE);

    return 0;
}

int kv_destroy(kv_repository_t* kv){

    if(!kv)return -1;
    free(kv->table);
    free(kv);
    return 0;
}

int kv_push(kv_repository_t* kv,char* key,char* value){

    if(!kv || !key || !value)return -1;

    kv_pair_t* pair=kv->table + (kv->cur_kv_num + + );
    strcpy(pair->key,key);
    strcpy(pair->value,value);
    return 0;
}

int kv_zero(kv_repository_t* kv){
    if(!kv)return -1;
    memset(kv->table,0,sizeof(kv->table));
    kv->cur_kv_num=0;
    return 0;
}

char* kv_getpair(kv_repository_t* kv,char* key){

    for(int i=0;i<kv->cur_kv_num;i + + ){

        if(!strcmp(key,kv->table[i].key))return kv->table[i].value;

    }
    
    return NULL;
}

//reactor maintenance function
int zya_reactor_init(zya_reactor_t* reactor){

    if(reactor == NULL){
        printf("ERROR:reactor is NULL!\
");
        return -1;
    }

    memset(reactor,0,sizeof(zya_reactor_t));
   
    reactor->epfd=epoll_create(1);
    
    //Set the head node, only the pointer field is needed
    reactor->blockhead=(zya_block_t*)malloc(sizeof(zya_block_t));
    if(reactor->blockhead == NULL){
        printf("ERROR:malloc failed!\
");
        return -3;
    }
    reactor->blockhead->nextblock=NULL;
    reactor->blockhead->connetshead=NULL;

    reactor->blocktail=reactor->blockhead;

    reactor->blockcount=0;

    return 0;
}

int zya_reactor_destroy(zya_reactor_t* reactor){
    if(reactor==NULL){
        printf("ERROR: reactor is NULL!\
");
        return -1;
    }

    zya_block_t* delhead=reactor->blockhead;
    zya_block_t* del=delhead->nextblock;

    //Delete the remaining blocks first
    while(del){
        zya_block_t* curdel=del;
        del=del->nextblock;

        free(curdel->connetshead);
        free(curdel);
    }

    //Delete the head node again
    free(delhead);
    
    //Delete the reactor node again
    free(reactor);

    return 0;
}

zya_connet_t* zya_reactor_local_connet(zya_reactor_t* reactor,int fd){ //Find the position of fd in reactor, brought out by connet

    if(fd<0 || !reactor){
        printf("input error!");
        return NULL;
    }

    int block_index=fd/CONNETCT_SIZE_PER_BLOCK + 1; //Find the sequence number of the block it belongs to

    if(block_index>reactor->blockcount){
        printf("block too less");
        return NULL;
    }

    zya_block_t* pnt=reactor->blockhead->nextblock;
    int i=1;
    while(i<block_index){
        if(pnt == NULL)break;
        pnt=pnt->nextblock;
        i + + ;
    }

    return & amp;pnt->connetshead[fd%CONNETCT_SIZE_PER_BLOCK];
    
}

int zya_reactor_add_block(zya_reactor_t* reactor){

    zya_block_t* newblock=(zya_block_t*)malloc(sizeof(zya_block_t));
    
    newblock->connetshead=(zya_connet_t*)malloc(sizeof(zya_connet_t)*(CONNETCT_SIZE_PER_BLOCK));
    newblock->nextblock=NULL;

    reactor->blocktail->nextblock=newblock;
    reactor->blocktail=newblock;

    reactor->blockcount + + ;
}

int zya_reactor_add_events(zya_reactor_t* reactor,int fd,int event,ZYA_CALLBACK_FUNC func){

    if(reactor==NULL || fd<0 || !func){
        return -1;
    }

    
    while(fd/CONNETCT_SIZE_PER_BLOCK + 1>reactor->blockcount){
        zya_reactor_add_block(reactor);
        printf("add a block!\
");
    }

    
    zya_connet_t* connet=zya_reactor_local_connet(reactor,fd); //brought out by connet
    if(connet == NULL){
        printf("local_connet failed!\
");
        return -2;
    }
    connet->callback=func;
    connet->fd=fd;
    connet->count=BUFFER_SIZE;

    struct epoll_event ev;
    ev.events=event;
    ev.data.fd=fd;

    epoll_ctl(reactor->epfd,EPOLL_CTL_ADD,fd, & amp;ev);

    return 0;
}

//Receive socket initialization
int socket_listen_init(short port){
    if(port<=0)return -1;
    //Set socket listening

    int sockfd=socket(AF_INET,SOCK_STREAM,0);
    if(sockfd==-1){
        return -2;
    }

    struct sockaddr_in addr;
    memset( & amp;addr,0,sizeof(addr));
    addr.sin_family=AF_INET;
    addr.sin_addr.s_addr=htonl(INADDR_ANY);
    addr.sin_port=htons(port);

    int ret=bind(sockfd,(struct sockaddr*) & amp;addr,sizeof(struct sockaddr));
    if(ret==-1){
        perror("bind");
        return -3;
    }

    ret=listen(sockfd,LISTEN_NUM);
    if(ret==-1){
        perror("listen");
        return -4;
    }

    return sockfd;
}

//Callback function implementation
int send_callback(int fd,int events,void* arg){
    zya_reactor_t* reactor=(zya_reactor_t*)arg;
    zya_connet_t* connet=zya_reactor_local_connet(reactor,fd);

    zya_http_response(connet);

    send(fd,connet->wbuffer,connet->wcount,0);

    #if 1

        if (connet->enable_sendfile) { // sendbody
        
            int filefd = open(connet->resource, O_RDONLY);
            if (filefd == -1) {
                printf("errno: %d\
", errno);
                return -1;
            }

            struct stat stat_buf;
            fstat(filefd, & stat_buf);
            
            int ret = sendfile(fd, filefd, NULL, stat_buf.st_size); // sendbody
            if (ret == -1) {
                printf("errno: %d\
", errno);
            }

            close(filefd);
            connet->enable_sendfile=0;
        }
\t
    #endif


    memset(connet->wbuffer,0,BUFFER_SIZE);
    memset(connet->rbuffer,0,BUFFER_SIZE);
    connet->rcount=0;
    connet->wcount=0;
    
    connet->callback=recv_callback;

    struct epoll_event ev;
    ev.events=EPOLLIN;
    ev.data.fd=fd;
    epoll_ctl(reactor->epfd,EPOLL_CTL_MOD,fd, & amp;ev);

}

int recv_callback(int fd,int events,void* arg){

    zya_reactor_t* reactor =(zya_reactor_t*)arg;

    zya_connet_t* connet=zya_reactor_local_connet(reactor,fd);

    char buffer[4]={0};
    while(1){
        int len=recv(fd,buffer,4,MSG_DONTWAIT);

        if(len==0){
            memset(connet,0,sizeof(zya_connet_t));

            epoll_ctl(reactor->epfd,EPOLL_CTL_DEL,fd,NULL);
            close(fd);
            return -1;
        }

        if(len<0){
            if(errno == EAGAIN){
                break;
            }

            perror("recv");
            return -2;
        }

        

        connet->rcount + =len;
        strcat(connet->rbuffer,buffer);

    }

    zya_http_request(connet);
    
    //strcpy(connet->wbuffer,connet->rbuffer);
    //connet->wcount=connet->rcount;

    connet->callback=send_callback;

    struct epoll_event ev;
    ev.events=EPOLLOUT;
    ev.data.fd=fd;
    epoll_ctl(reactor->epfd,EPOLL_CTL_MOD,fd, & amp;ev);

    return 0;

}

int accept_callback(int fd,int events,void* arg){

    zya_reactor_t* reactor=(zya_reactor_t*)arg;

    struct sockaddr_in client_addr;
    memset( & amp;client_addr,0,sizeof(struct sockaddr_in));
    int len=sizeof(struct sockaddr_in);

    int clientfd=accept(fd,(struct sockaddr*) & amp;client_addr, & amp;len);
    if(clientfd==-1){
        perror("accept");
        return -1;
    }
    printf("client connect:%d\
",clientfd);
    zya_reactor_add_events(reactor,clientfd,EPOLLIN | EPOLLET,recv_callback);
    return 0;
}

//http request processing function implementation
int readline(char* allbuf,int idx,char* rtnbuf){

    int len=strlen(allbuf);
    for(;idx<len;idx + + ){
        if(allbuf[idx] == '\r' & amp; & amp; allbuf[idx + 1] == '\
'){
            return idx + 2;
        } else {
            *(rtnbuf + + )=*(allbuf + idx);
        }
    }

    return -1;
}

int zya_http_response(zya_connet_t* connet){

#if 0
    //Set the corresponding message
    int len = sprintf(connet->wbuffer,
    "HTTP/1.1 200 OK\r\
"
    "Accept-Ranges: bytes\r\
"
    "Content-Length: 78\r\
"
    "Content-Type: text/html\r\
"
    "Date: Sat, 06 Aug 2022 13:16:46 GMT\r\
\r\
"
    "<html><head><title>0voice.zya</title></head><body><h1>zyayyds</h1><body/></html>");

    connet->wcount = len;
#elif 0

    printf("resource:%s\
",connet->resource);

    int filefd=open(connet->resource,O_RDONLY);
    if(filefd == -1){
        perror("open");
        return -1;
    }

    //Get the file length, that is, stat_buf.
    struct stat stat_buf;
fstat(filefd, & stat_buf);

    //Set message header
    int len = sprintf(connet->wbuffer,
        "HTTP/1.1 200 OK\r\
"
        "Accept-Ranges: bytes\r\
"
        "Content-Length: %ld\r\
"
        "Content-Type: text/html\r\
"
        "Date: Sat, 06 Aug 2022 13:16:46 GMT\r\
\r\
", stat_buf.st_size);

    len + =read(filefd,connet->wbuffer + len,BUFFER_SIZE-len);

    connet->wcount=len;
    close(filefd);

#elif 0
    printf("resource:%s\
",connet->resource);

    int filefd=open(connet->resource,O_RDONLY);
    if(filefd == -1){
        perror("open");
        return -1;
    }

    //Get the file length, that is, stat_buf.
    struct stat stat_buf;
fstat(filefd, & stat_buf);

    //Set message header
    int len = sprintf(connet->wbuffer,
        "HTTP/1.1 200 OK\r\
"
        "Accept-Ranges: bytes\r\
"
        "Content-Length: %ld\r\
"
        "Content-Type: text/html\r\
"
        "Date: Sat, 06 Aug 2022 13:16:46 GMT\r\
\r\
", stat_buf.st_size);


    connet->wcount=len;
    
    connet->enable_sendfile=1;
    close(filefd);
#elif 1
     printf("resource:%s\
",connet->resource);

    int filefd=open(connet->resource,O_RDONLY);
    if(filefd == -1){
        perror("open");
        return -1;
    }

    //Get the file length, that is, stat_buf.
    struct stat stat_buf;
fstat(filefd, & stat_buf);

    //Set message header
    int len = sprintf(connet->wbuffer,
        "HTTP/1.1 200 OK\r\
"
        "Accept-Ranges: bytes\r\
"
        "Content-Length: %ld\r\
"
        "Content-Type: text/html\r\
"
        "Date: Sat, 06 Aug 2022 13:16:46 GMT\r\
\r\
", stat_buf.st_size);


    connet->wcount=len;
    
    connet->enable_sendfile=1;
    close(filefd);
#endif
}

int zya_http_request(zya_connet_t* connet){
    printf("http --> request:\
%s\
",connet->rbuffer);

    //Request message example:
    /*
        GET/HTTP/1.1
        Host: 192.168.213.128:8000
        Connection: keep-alive
        Cache-Control: max-age=0
        Upgrade-Insecure-Requests: 1
        User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.36
        Accept: text/html,application/xhtml + xml,application/xml;q=0.9,image/webp,image/apng,*//*;q=0.8,application/signed-exchange;v=b3;q=0.7
        Accept-Encoding: gzip, deflate
        Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6
    */

   char linebuffer[1024]={0};
   int index=readline(connet->rbuffer,0,linebuffer);

    if (strstr(linebuffer, "GET")) { // resource


int i = 0;
while (linebuffer[sizeof("GET ") + i] != ' ') i + + ;
linebuffer[sizeof("GET ") + i] = '\0';
\t\t
sprintf(connet->resource, "%s/%s", HTTP_WEB_ROOT, linebuffer + sizeof("GET "));

       
}


}

3. Ending

The Reactor pattern is used at the network level in many applications, such as Redis and nginx. Each application will optimize Reactor according to its own needs. For example, Redis uses a single Reactor. The article is only for beginners to learn and record notes. Mistakes are inevitable. I hope to actively communicate. Thank you Mina-san.

Like and follow An An Meow Thank you meow

Course learning link: https://xxetb.xet.tech/s/4czPSo