The main purpose is to record a test of one million concurrent connections using the multi-channel IO multiplexing network programming skills of epoll + reactor learned earlier. This article is mainly about implementation and does not use complex data structures. The second part will use some data structure optimization.
Code
Just use a slightly modified epoll on the server side.
#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <pthread.h> #include <sys/poll.h> #include <sys/epoll.h> #include <sys/time.h> #defineBUFFER_LENGTH 256 typedef int (*RCALLBACK)(int fd); // listenfd // EPOLLIN --> int accept_cb(int fd); // clientfd // int recv_cb(int fd); int send_cb(int fd); // conn, fd, buffer, callback struct conn_item {<!-- --> int fd; \t char rbuffer[BUFFER_LENGTH]; int rlen; char wbuffer[BUFFER_LENGTH]; int wlen; union {<!-- --> RCALLBACK accept_callback; RCALLBACK recv_callback; } recv_t; RCALLBACK send_callback; }; // libevent --> int epfd = 0; struct conn_item connlist[1048576] = {<!-- -->0}; //Directly violently change the array to 100w struct timeval zvoice_king; // // 1000000 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int set_event(int fd, int event, int flag) {<!-- --> if (flag) {<!-- --> // 1 add, 0 mod struct epoll_event ev; ev.events = event; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, & amp;ev); } else {<!-- --> \t struct epoll_event ev; ev.events = event; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, & amp;ev); } \t } int accept_cb(int fd) {<!-- --> struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); \t int clientfd = accept(fd, (struct sockaddr*) & amp;clientaddr, & amp;len); if (clientfd < 0) {<!-- --> return -1; } set_event(clientfd, EPOLLIN, 1); connlist[clientfd].fd = clientfd; memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH); connlist[clientfd].rlen = 0; memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH); connlist[clientfd].wlen = 0; \t connlist[clientfd].recv_t.recv_callback = recv_cb; connlist[clientfd].send_callback = send_cb; if ((clientfd % 1000) == 999) {<!-- --> struct timeval tv_cur; gettimeofday( & amp;tv_cur, NULL); int time_used = TIME_SUB_MS(tv_cur, zvoice_king);//Calculate the time memcpy( & amp;zvoice_king, & amp;tv_cur, sizeof(struct timeval)); \t\t printf("clientfd : %d, time_used: %d\ ", clientfd, time_used); } return clientfd; } int recv_cb(int fd) {<!-- --> // fd --> EPOLLIN char *buffer = connlist[fd].rbuffer; int idx = connlist[fd].rlen; \t int count = recv(fd, buffer + idx, BUFFER_LENGTH-idx, 0); if (count == 0) {<!-- --> printf("disconnect\ "); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); \t\t return -1; } connlist[fd].rlen + = count; memcpy(connlist[fd].wbuffer, connlist[fd].rbuffer, connlist[fd].rlen); connlist[fd].wlen = connlist[fd].rlen; connlist[fd].rlen -= connlist[fd].rlen; set_event(fd, EPOLLOUT, 0); \t return count; } int send_cb(int fd) {<!-- --> char *buffer = connlist[fd].wbuffer; int idx = connlist[fd].wlen; int count = send(fd, buffer, idx, 0); set_event(fd, EPOLLIN, 0); return count; } int init_server(unsigned short port) {<!-- --> int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset( & amp;serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*) & amp;serveraddr, sizeof(struct sockaddr))) {<!-- --> perror("bind"); return -1; } listen(sockfd, 10); return sockfd; } // tcp int main() {<!-- --> int port_count = 100; unsigned short port = 2048; int i = 0; \t epfd = epoll_create(1); // int size for (i = 0;i < port_count;i + + ) {<!-- --> int sockfd = init_server(port + i); // 2048, 2049, 2050, 2051 ... 2057 connlist[sockfd].fd = sockfd; connlist[sockfd].recv_t.accept_callback = accept_cb; set_event(sockfd, EPOLLIN, 1); } gettimeofday( & amp;zvoice_king, NULL); struct epoll_event events[1024] = {<!-- -->0}; \t while (1) {<!-- --> // mainloop(); int nready = epoll_wait(epfd, events, 1024, -1); // int i = 0; for (i = 0;i < nready;i + + ) {<!-- --> int connfd = events[i].data.fd; if (events[i].events & amp; EPOLLIN) {<!-- --> // int count = connlist[connfd].recv_t.recv_callback(connfd); } else if (events[i].events & amp; EPOLLOUT) {<!-- --> int count = connlist[connfd].send_callback(connfd); } } } }
Use this to test the connection code, and you can use it to test the number of concurrency on other servers.
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> #include <errno.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <netdb.h> #include <fcntl.h> #defineMAX_BUFFER 128 #define MAX_EPOLLSIZE (384*1024) #defineMAX_PORT 100 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int isContinue = 0; static int ntySetNonblock(int fd) {<!-- --> int flags; flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int ntySetReUseAddr(int fd) {<!-- --> int reuse = 1; return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) & amp;reuse, sizeof(reuse)); } int main(int argc, char **argv) {<!-- --> if (argc <= 2) {<!-- --> printf("Usage: %s ip port\ ", argv[0]); exit(0); } const char *ip = argv[1]; int port = atoi(argv[2]); int connections = 0; char buffer[128] = {<!-- -->0}; int i = 0, index = 0; struct epoll_event events[MAX_EPOLLSIZE]; \t int epoll_fd = epoll_create(MAX_EPOLLSIZE); \t strcpy(buffer, " Data From MulClient\ "); \t\t struct sockaddr_in addr; memset( & amp;addr, 0, sizeof(struct sockaddr_in)); \t addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip); struct timeval tv_begin; gettimeofday( & amp;tv_begin, NULL); while (1) {<!-- --> if ( + + index >= MAX_PORT) index = 0; \t\t struct epoll_event ev; int sockfd = 0; if (connections < 380000 & amp; & amp; !isContinue) {<!-- --> sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) {<!-- --> perror("socket"); goto err; } //ntySetReUseAddr(sockfd); addr.sin_port = htons(port + index); if (connect(sockfd, (struct sockaddr*) & amp;addr, sizeof(struct sockaddr_in)) < 0) {<!-- --> perror("connect"); goto err; } ntySetNonblock(sockfd); ntySetReUseAddr(sockfd); sprintf(buffer, "Hello Server: client --> %d\ ", connections); send(sockfd, buffer, strlen(buffer), 0); ev.data.fd = sockfd; ev.events = EPOLLIN | EPOLLOUT; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, & amp;ev); \t\t connections + + ; } //connections + + ; if (connections % 1000 == 999 || connections >= 380000) {<!-- --> struct timeval tv_cur; memcpy( & amp;tv_cur, & amp;tv_begin, sizeof(struct timeval)); \t\t\t gettimeofday( & amp;tv_begin, NULL); int time_used = TIME_SUB_MS(tv_begin, tv_cur); printf("connections: %d, sockfd:%d, time_used:%d\ ", connections, sockfd, time_used); int nfds = epoll_wait(epoll_fd, events, connections, 100); for (i = 0;i < nfds;i + + ) {<!-- --> int clientfd = events[i].data.fd; if (events[i].events & amp; EPOLLOUT) {<!-- --> sprintf(buffer, "data from %d\ ", clientfd); send(sockfd, buffer, strlen(buffer), 0); } else if (events[i].events & amp; EPOLLIN) {<!-- --> char rBuffer[MAX_BUFFER] = {<!-- -->0}; ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0); if (length > 0) {<!-- --> printf("RecvBuffer:%s\ ", rBuffer); if (!strcmp(rBuffer, "quit")) {<!-- --> isContinue = 0; } \t\t\t\t\t\t } else if (length == 0) {<!-- --> printf("Disconnect clientfd:%d\ ", clientfd); connections --; close(clientfd); } else {<!-- --> if (errno == EINTR) continue; printf("Error clientfd:%d, errno:%d\ ", clientfd, errno); close(clientfd); } } else {<!-- --> printf(" clientfd:%d, errno:%d\ ", clientfd, errno); close(clientfd); } } } usleep(1 * 1000); } return 0; err: printf("error : %s\ ", strerror(errno)); return 0; \t }
Virtual machine
We prepare three virtual machines for experiments. The configuration is as follows:
- server: 8 core 8G
- client: 2 core 4G
- client: 2 core 4G
- client: 2 core 4G
I am using ubuntu here, other distributions should be similar.
Parameter settings
Modify the number of open file handles
First we need to set the number of file handles that can be opened:
ulimit -n 1048576
Loading IP tracking module
modprobe ip_conntrack
Modify system settings
cd /etc ls | grep sysctl.conf
If not found, create one
touch sysctl.conf
if found
sudo vim sysctl.conf
Add the following at the end:
server:
net.ipv4.ip_local_port_range = 1024 65535 net.ipv4.tcp_mem = 1772864 1572864 1572864 net.ipv4.tcp_wmem = 512 512 1024 net.ipv4.tcp_rmem = 512 512 1024 fs.file-max = 1048576
client:
net.ipv4.ip_local_port_range = 1024 65535 net.ipv4.tcp_mem = 262144 524288 786432 net.ipv4.tcp_wmem = 512 512 1024 net.ipv4.tcp_rmem = 512 512 1024
Then make them effective
sudo sysctl -p
Start testing
First, compile these two codes. Then ifconfig
looks at the IP address.
The server runs the first piece of code, and the client runs the client code (the parameters are IP and port number).
Problems encountered
When using htop
to monitor system resources. We will find that mem suddenly drops while running. It’s just a problem of not enough memory. Here you need to modify the allocation of the tcp protocol stack, which is the parameter of sysctl.conf
. What I provide are the parameters I configured. Increasing tcp.mem
can increase the memory allocation of the tcp protocol stack. Changing tcp_rmem tcp_wmem
to a smaller value can modify the memory allocation for each tcp connection. If you still can’t run 1 million. To modify the client parameters. The final solution is to modify the large virtual memory allocation of the virtual machine.
The problem of data structure will be solved next time
Practice test of millions of concurrent connections 02