Encapsulate reactor to achieve millions of concurrency

First, reactor uses an epoll to encapsulate I/O multiplexing.
For each received connfd, if the global receive buffer and send buffer are used, dirty reading will occur. Therefore, a structure must be established, an object must be created for each connfd, and the corresponding object must be created inside the structure. socket buffer

struct sock_item {<!-- --> // conn_item

int fd; // clientfd

char *rbuffer;
int rlength; //

char *wbuffer;
int wlength;
\t
int event;

void (*recv_cb)(int fd, char *buffer, int length);
void (*send_cb)(int fd, char *buffer, int length);

void (*accept_cb)(int fd, char *buffer, int length);

};

Through the buffer declared by this structure, the impact of the public global buffer can be avoided.
But at this time, although the buffer problem was solved by declaring the socket structure, another problem was faced, which was how many struct sock_item to declare, because the project wanted to build millions of concurrencies. So directly allocate a space of 1 million? Of course it is not possible. This will cause a lot of memory waste during most of the time the program is running. Moreover, 1 million continuous controls also have great requirements on memory. When the operating system runs for a long time, a large amount of memory fragments will be generated. , it is difficult to have such a large memory waiting for us. If not, the program will directly return an error, resulting in a particularly low robustness of the server program.
The solution is to connect a small continuous address in the form of a linked list. The continuous address is allocated 1024 struct sock_item. If it exceeds the currently allocated size, just add one at the end of the linked list. The solution is dynamic expansion.

struct eventblock {<!-- -->
struct sock_item *items;
struct eventblock *next;
};

Now that the linked list is there, all we need to do is add nodes to the linked list.

int reactor_resize(struct reactor *r) {<!-- --> // new eventblock

if (r == NULL) return -1;

struct eventblock *blk = r->evblk;

while (blk != NULL & amp; & amp; blk->next != NULL) {<!-- -->
blk = blk->next;
}

struct sock_item* item = (struct sock_item*)malloc(ITEM_LENGTH * sizeof(struct sock_item));
if (item == NULL) return -4;
memset(item, 0, ITEM_LENGTH * sizeof(struct sock_item));

printf("-------------\\
");
struct eventblock *block = malloc(sizeof(struct eventblock));
if (block == NULL) {<!-- -->
free(item);
return -5;
}
memset(block, 0, sizeof(struct eventblock));

block->items = item;
block->next = NULL;

if (blk == NULL) {<!-- -->
r->evblk = block;
} else {<!-- -->
blk->next = block;
}
r->blkcnt + + ;

return 0;
}

At this time, it is a problem to find the corresponding struct sock_item based on connfd. Fortunately, the struct sock_item in the linked list are all in order and recorded according to the size of connfd, then we You only need to find the corresponding table entry, and then find the corresponding location in the table entry.

struct sock_item* reactor_lookup(struct reactor *r, int sockfd) {<!-- -->

if (r == NULL) return NULL;
//if (r->evblk == NULL) return NULL;
if (sockfd <= 0) return NULL;

printf("reactor_lookup --> %d\\
", r->blkcnt); //64
int blkidx = sockfd/ITEM_LENGTH;
while (blkidx >= r->blkcnt) {<!-- -->
reactor_resize(r);
}

int i = 0;
struct eventblock *blk = r->evblk;
while (i + + < blkidx & amp; & amp; blk != NULL) {<!-- -->
blk = blk->next;
}

return & amp;blk->items[sockfd % ITEM_LENGTH];
}

Now that the preliminary preparation work has been done, the code can be run to see the results, but a problem will be found. The connection stops when it reaches more than 60,000, and the application cannot be continued. What is going on? This involves the five-tuple of the network (source IP address, source port, destination IP address, destination port, protocol). The source IP address, destination IP address, destination port, and protocol are all fixed. The problem that occurs is Because the source port has been fully allocated, that is, the port ranges from 0 to 65535, when it reaches the maximum value, it cannot be allocated further. The solution can be to add a network card to the server to increase the IP address, but it is a bit troublesome. Can this problem be solved at the code level? Of course there is, just change the listening port from one to multiple, wouldn’t that solve the problem! Here I declare 100 listening ports.

int is_listenfd(int *fds, int connfd) {<!-- -->

int i = 0;
for (i = 0;i < PORT_COUNT;i + + ) {<!-- -->
if (fds[i] == connfd) {<!-- -->
return 1;
}
}

return 0;
}

int init_server(short port) {<!-- -->

int listenfd = socket(AF_INET, SOCK_STREAM, 0); //
if (listenfd == -1) return -1;
// listenfd
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);

if (-1 == bind(listenfd, (struct sockaddr*) & amp;servaddr, sizeof(servaddr))) {<!-- -->
return -2;
}

#if 1 // nonblock
int flag = fcntl(listenfd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(listenfd, F_SETFL, flag);
#endif

listen(listenfd, 10);

return listenfd;
}

Note: Please note here that I set all the sockets in the code to non-blocking. Why? Because blocking is too slow. . . Then I tried non-blocking, and it was fast. If you are interested, you can try it and see if you will encounter my problem. Just change #if 1 to #if 0.
Okay, that’s it now, give it a try, just find the client’s test code yourself. The overall code on the server side is as follows:

#include 
#include 
#include 
#include 
#include 

#include 

#include 
#include 
#include 
#include 


#defineBUFFER_LENGTH 128
#define EVENTS_LENGTH 128

#definePORT_COUNT 100
#define ITEM_LENGTH 1024


// listenfd, clientfd
struct sock_item {<!-- --> // conn_item

int fd; // clientfd

char *rbuffer;
int rlength; //

char *wbuffer;
int wlength;
\t
int event;

void (*recv_cb)(int fd, char *buffer, int length);
void (*send_cb)(int fd, char *buffer, int length);

void (*accept_cb)(int fd, char *buffer, int length);

};

struct eventblock {<!-- -->
struct sock_item *items;
struct eventblock *next;
};

struct reactor {
int epfd; //epoll
int blkcnt;

struct eventblock *evblk;
};

int reactor_resize(struct reactor *r) {<!-- --> // new eventblock

if (r == NULL) return -1;

struct eventblock *blk = r->evblk;

while (blk != NULL & amp; & amp; blk->next != NULL) {<!-- -->
blk = blk->next;
}

struct sock_item* item = (struct sock_item*)malloc(ITEM_LENGTH * sizeof(struct sock_item));
if (item == NULL) return -4;
memset(item, 0, ITEM_LENGTH * sizeof(struct sock_item));

printf("-------------\\
");
struct eventblock *block = malloc(sizeof(struct eventblock));
if (block == NULL) {<!-- -->
free(item);
return -5;
}
memset(block, 0, sizeof(struct eventblock));

block->items = item;
block->next = NULL;

if (blk == NULL) {<!-- -->
r->evblk = block;
} else {<!-- -->
blk->next = block;
}
r->blkcnt + + ;

return 0;
}


struct sock_item* reactor_lookup(struct reactor *r, int sockfd) {<!-- -->

if (r == NULL) return NULL;
//if (r->evblk == NULL) return NULL;
if (sockfd <= 0) return NULL;

printf("reactor_lookup --> %d\\
", r->blkcnt); //64
int blkidx = sockfd/ITEM_LENGTH;
while (blkidx >= r->blkcnt) {<!-- -->
reactor_resize(r);
}

int i = 0;
struct eventblock *blk = r->evblk;
while (i + + < blkidx & amp; & amp; blk != NULL) {<!-- -->
blk = blk->next;
}

return & amp;blk->items[sockfd % ITEM_LENGTH];
}

// thread --> fd
void *routine(void *arg) {

int clientfd = *(int *)arg;

while (1) {
\t\t
unsigned char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret == 0) {
close(clientfd);
break;
\t\t\t
}
printf("buffer : %s, ret: %d\\
", buffer, ret);

ret = send(clientfd, buffer, ret, 0); //

}

}


int init_server(short port) {

int listenfd = socket(AF_INET, SOCK_STREAM, 0); //
if (listenfd == -1) return -1;
// listenfd
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);

if (-1 == bind(listenfd, (struct sockaddr*) & amp;servaddr, sizeof(servaddr))) {
return -2;
}

#if 1 // nonblock
int flag = fcntl(listenfd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(listenfd, F_SETFL, flag);
#endif

listen(listenfd, 10);

return listenfd;
}


int is_listenfd(int *fds, int connfd) {

int i = 0;
for (i = 0;i < PORT_COUNT;i + + ) {
if (fds[i] == connfd) {
return 1;
}
}

return 0;
}

// socket -->
// bash --> execve("./server", "");
//
// 0, 1, 2
// stdin, stdout, stderr
int main() {

// block

\t
/// **********
struct reactor *r = (struct reactor*)calloc(1, sizeof(struct reactor));
if (r == NULL) {
return -3;
}
//memset();

r->epfd = epoll_create(1);
struct epoll_event ev, events[EVENTS_LENGTH];

 \t
int sockfds[PORT_COUNT] = {0};
int i = 0;
for (i = 0;i < PORT_COUNT;i + + ) {
sockfds[i] = init_server(9999 + i);

ev.events = EPOLLIN;
ev.data.fd = sockfds[i]; //

epoll_ctl(r->epfd, EPOLL_CTL_ADD, sockfds[i], & amp;ev);
}
/// ************** //

//
\t

while (1) { // 7 * 24

int nready = epoll_wait(r->epfd, events, EVENTS_LENGTH, -1); // -1, ms
//printf("------- %d\\
", nready);
\t\t
int i = 0;
for (i = 0;i < nready;i + + ) {
int clientfd = events[i].data.fd;
\t\t\t
if (is_listenfd(sockfds, clientfd)) { // accept

struct sockaddr_in client;
socklen_t len = sizeof(client);
int connfd = accept(clientfd, (struct sockaddr*) & amp;client, & amp;len);
if (connfd == -1) break;
\t\t\t\t
if (connfd % 1000 == 999) {
\t\t\t\t
printf("accept: %d\\
", connfd);

}

#if 1 // nonblock
int flag = fcntl(connfd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
#endif
\t\t\t\t
ev.events = EPOLLIN;
ev.data.fd = connfd;
epoll_ctl(r->epfd, EPOLL_CTL_ADD, connfd, & amp;ev);
#if 0
r->items[connfd].fd = connfd;
\t\t\t
r->items[connfd].rbuffer = calloc(1, BUFFER_LENGTH);
r->items[connfd].rlength = 0;
\t\t\t\t
r->items[connfd].wbuffer = calloc(1, BUFFER_LENGTH);
r->items[connfd].wlength = 0;

r->items[connfd].event = EPOLLIN;
#else

struct sock_item *item = reactor_lookup(r, connfd);
item->fd = connfd;
item->rbuffer = calloc(1, BUFFER_LENGTH);
item->rlength = 0;

item->wbuffer = calloc(1, BUFFER_LENGTH);
item->wlength = 0;
#endif
} else if (events[i].events & amp; EPOLLIN) { //clientfd

//char rbuffer[BUFFER_LENGTH] = {0};

struct sock_item *item = reactor_lookup(r, clientfd);

char *rbuffer = item->rbuffer;
char *wbuffer = item->wbuffer;
\t\t\t\t
int n = recv(clientfd, rbuffer, BUFFER_LENGTH, 0);
if (n > 0) {
//rbuffer[n] = '\0';

printf("recv: %s, n: %d\\
", rbuffer, n);

memcpy(wbuffer, rbuffer, BUFFER_LENGTH);

ev.events = EPOLLOUT;
ev.data.fd = clientfd;

epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, & amp;ev);
\t\t\t\t\t
} else if (n == 0) {

free(rbuffer);
free(wbuffer);
\t\t\t\t\t
item->fd = 0;

close(clientfd);
}
\t\t\t\t
} else if (events[i].events & amp; EPOLLOUT) {

\t\t\t\t
struct sock_item *item = reactor_lookup(r, clientfd);
char *wbuffer = item->wbuffer;
\t\t\t\t
int sent = send(clientfd, wbuffer, BUFFER_LENGTH, 0); //
printf("sent: %d\\
", sent);

ev.events = EPOLLIN;
ev.data.fd = clientfd;

epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, & amp;ev);
}
}
}
}