Concurrent implementation of millions of servers based on EPOLL-based reactor

1. In what scenarios should level trigger or edge trigger be used?

When the amount of data is relatively small and the performance is relatively high, we generally use edge triggering. (How do we divide the size of the data? When the buffer cannot be read at one time, it is large data, and if it is read at one time, it is small data)

When listenfd faces a large amount of data, in order to avoid missing connections, we need to use horizontal triggers

2. How to store millions of fd collections?

In this storage, we use a linked list structure, as shown in the figure:

Each module contains a block, and each block belongs to a structure array. The maximum storage number of this array is 1024, so that we can control connections of more than one million levels.

3. Can we detect EPOLLIN and EPOLLOUT at the same time when creating cfd?

No, because EPOLLOUT listens to the write buffer during initialization. Since the initial buffer is empty, this buffer will be writable and triggered all the time! !

4. Why may a read event be triggered under the edge trigger setting, but there is no data in the buffer?

Because in the case of edge triggering, once there is a connection or other read event, the event will be triggered

5. Why do you need to set non-blocking fd?

Because the data is likely to be wrongly judged as empty, it will always be blocked on the read, thus affecting performance. It’s also a way to be on the safe side.

For the structure of the reactor, the main thing is the setting of the callback function, and the callback function can be stored on the structure array, as shown in the figure below.

The specific code is explained as follows:



#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>

#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>


#define BUFFER_LENGTH 4096
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8888
#define PORT_COUNT 100

typedef int NCALLBACK(int ,int, void*);

struct ntyevent {
int fd;
int events;
void *arg;
int (*callback)(int fd, int events, void *arg);
\t
int status;
char buffer[BUFFER_LENGTH];
int length;
long last_active;
};

struct eventblock {

struct eventblock *next; //block header connection of each block
struct ntyevent *events; //corresponding to each 1024 description table
\t
};

struct ntyreactor {
int epfd; //tree root efd
int blkcnt; //The number of connected blocks
struct eventblock *evblk; //fd --> 100w //Entrance of storage block
};


int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd);


void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {

ev->fd = fd;
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
ev->last_active = time(NULL);

return;
\t
}


int nty_event_add(int epfd, int events, struct ntyevent *ev) {

struct epoll_event ep_ev = {0, {0}};
ep_ev.data.ptr = ev; //Find the place to store the array
ep_ev.events = ev->events = events;

int op;
if (ev->status == 1) {
op = EPOLL_CTL_MOD;
} else {
op = EPOLL_CTL_ADD;
ev->status = 1;
}

if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {
printf("event add failed [fd=%d], events[%d]\
", ev->fd, events);
return -1;
}

return 0;
}

int nty_event_del(int epfd, struct ntyevent *ev) {

struct epoll_event ep_ev = {0, {0}};

if (ev->status != 1) {
return -1;
}

ep_ev.data.ptr = ev;
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);

return 0;
}

int recv_cb(int fd, int events, void *arg) {

struct ntyreactor *reactor = (struct ntyreactor*)arg;
struct ntyevent *ev = ntyreactor_idx(reactor, fd);

int len = recv(fd, ev->buffer, BUFFER_LENGTH , 0); //
nty_event_del(reactor->epfd, ev);

if (len > 0) {
\t\t
ev->length = len;
ev->buffer[len] = '\0';

printf("C[%d]:%s\
", fd, ev->buffer);

nty_event_set(ev, fd, send_cb, reactor);
nty_event_add(reactor->epfd, EPOLLOUT, ev);
\t\t
\t\t
} else if (len == 0) {

close(ev->fd);
//printf("[fd=%d] pos[%ld], closed\
", fd, ev-reactor->events);
\t\t 
} else {

close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\
", fd, errno, strerror(errno));
\t\t
}

return len;
}


int send_cb(int fd, int events, void *arg) {

struct ntyreactor *reactor = (struct ntyreactor*)arg;
struct ntyevent *ev = ntyreactor_idx(reactor, fd);

int len = send(fd, ev->buffer, ev->length, 0);
if (len > 0) {
printf("send[fd=%d], [%d]%s\
", fd, len, ev->buffer);

nty_event_del(reactor->epfd, ev);
nty_event_set(ev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ev);
\t\t
} else {

close(ev->fd);

nty_event_del(reactor->epfd, ev);
printf("send[fd=%d] error %s\
", fd, strerror(errno));

}

return len;
}

int accept_cb(int fd, int events, void *arg) {

struct ntyreactor *reactor = (struct ntyreactor*)arg;
if (reactor == NULL) return -1;

struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);

int clientfd;

if ((clientfd = accept(fd, (struct sockaddr*) &client_addr, &len)) == -1) {
if (errno != EAGAIN & amp; & amp; errno != EINTR) {
\t\t
}
printf("accept: %s\
", strerror(errno));
return -1;
}

\t

int flag = 0;
if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
printf("%s: fcntl nonblocking failed, %d\
", __func__, MAX_EPOLL_EVENTS);
return -1;
}

struct ntyevent *event = ntyreactor_idx(reactor, clientfd); //Get the array space to store the fd
\t
nty_event_set(event, clientfd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, event);

\t
printf("new connect [%s:%d], pos[%d]\
",
inet_ntoa(client_addr. sin_addr), ntohs(client_addr. sin_port), clientfd);

return 0;

}

int init_sock(short port) {

int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);

struct sockaddr_in server_addr;
memset( & server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);

bind(fd, (struct sockaddr*) & server_addr, sizeof(server_addr));

if (listen(fd, 20) < 0) {
printf("listen failed : %s\
", strerror(errno));
}

return fd;
}

//If the module is already full of 1024, open up a new address space block for storage and open up a new block memory space
int ntyreactor_alloc(struct ntyreactor *reactor) {

if (reactor == NULL) return -1;
if (reactor->evblk == NULL) return -1;

struct eventblock *blk = reactor->evblk; //Look for an empty position, reactor->evblk belongs to the starting position
while (blk->next != NULL) {
blk = blk->next;
}

struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL) {
printf("ntyreactor_alloc ntyevents failed\
");
return -2;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
if (block == NULL) {
printf("ntyreactor_alloc eventblock failed\
");
return -2;
}
memset(block, 0, sizeof(struct eventblock));

block->events = evs;
block->next = NULL;

blk->next = block;
reactor->blkcnt + + ; //

return 0;
}


//Find the location block it belongs to
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {

int blkidx = sockfd / MAX_EPOLL_EVENTS;

while (blkidx >= reactor->blkcnt) {
ntyreactor_alloc(reactor);
}

int i = 0;
struct eventblock *blk = reactor->evblk;
while(i + + < blkidx & amp; & amp; blk != NULL) {
blk = blk->next;
}

return & blk->events[sockfd % MAX_EPOLL_EVENTS];
}

//Initialize the reactor, apply for the tree
int ntyreactor_init(struct ntyreactor *reactor) {

if (reactor == NULL) return -1;
memset(reactor, 0, sizeof(struct ntyreactor));

reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0) {
printf("create epfd in %s err %s\
", __func__, strerror(errno));
return -2;
}

struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL) {
printf("ntyreactor_alloc ntyevents failed\
");
return -2;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
if (block == NULL) {
printf("ntyreactor_alloc eventblock failed\
");
return -2;
}
memset(block, 0, sizeof(struct eventblock));

block->events = evs;
block->next = NULL;

reactor->evblk = block;
reactor->blkcnt = 1;

return 0;
}
//destroy the reactor
int ntyreactor_destory(struct ntyreactor *reactor) {

close(reactor->epfd);
//free(reactor->events);

struct eventblock *blk = reactor->evblk;
struct eventblock *blk_next = NULL;

while (blk != NULL) {

blk_next = blk->next;

free(blk->events);
free(blk);

blk = blk_next;

}
\t
return 0;
}


//Add the number of lfd and the initialization content
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {

if (reactor == NULL) return -1;
if (reactor->evblk == NULL) return -1;

//reactor->evblk->events[sockfd];
struct ntyevent *event = ntyreactor_idx(reactor, sockfd);

nty_event_set(event, sockfd, acceptor, reactor);
nty_event_add(reactor->epfd, EPOLLIN, event);

return 0;
}



int ntyreactor_run(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->evblk == NULL) return -1;
\t
struct epoll_event events[MAX_EPOLL_EVENTS + 1];
\t
int checkpos = 0, i;

while (1) {
/*
long now = time(NULL);
for (i = 0;i < 100;i ++ , checkpos ++ ) {
if (checkpos == MAX_EPOLL_EVENTS) {
checkpos = 0;
}

if (reactor->events[checkpos].status != 1) {
continue;
}

long duration = now - reactor->events[checkpos].last_active;

if (duration >= 60) {
close(reactor->events[checkpos].fd);
printf("[fd=%d] timeout\
", reactor->events[checkpos].fd);
nty_event_del(reactor->epfd, &reactor->events[checkpos]);
}
}
*/

int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
if (nready < 0) {
printf("epoll_wait error, exit\
");
continue;
}

for (i = 0;i < nready;i ++ ) {

struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr; //The ptr in data has pointed to the corresponding location pointer when it is added
//So it is of course to use an ntyevent pointer to point to
if ((events[i].events & amp; EPOLLIN) & amp; & amp; (ev->events & amp; EPOLLIN)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & amp; EPOLLOUT) & amp; & amp; (ev->events & amp; EPOLLOUT)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
\t\t\t
}

}
}

// 3, 6w, 1, 100 ==
// <remoteip, remoteport, localip, localport>
int main(int argc, char *argv[]) {

unsigned short port = SERVER_PORT; // listen 8888
if (argc == 2) {
port = atoi(argv[1]);
}
struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
ntyreactor_init(reactor);

int i = 0;
int sockfds[PORT_COUNT] = {0};
for (i = 0; i < PORT_COUNT; i ++ ) {
sockfds[i] = init_sock(port + i);
ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
}

\t
ntyreactor_run(reactor);

ntyreactor_destory(reactor);

for (i = 0; i < PORT_COUNT; i ++ ) {
close(sockfds[i]);
}

free(reactor);

return 0;
}