Practice test of millions of concurrent connections 01

The main purpose is to record a test of one million concurrent connections using the multi-channel IO multiplexing network programming skills of epoll + reactor learned earlier. This article is mainly about implementation and does not use complex data structures. The second part will use some data structure optimization.

Code

Just use a slightly modified epoll on the server side.

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>

#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <pthread.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/time.h>


#defineBUFFER_LENGTH 256

typedef int (*RCALLBACK)(int fd);

// listenfd
// EPOLLIN -->
int accept_cb(int fd);
// clientfd
//
int recv_cb(int fd);
int send_cb(int fd);

// conn, fd, buffer, callback
struct conn_item {<!-- -->
int fd;
\t
char rbuffer[BUFFER_LENGTH];
int rlen;
char wbuffer[BUFFER_LENGTH];
int wlen;

union {<!-- -->
RCALLBACK accept_callback;
RCALLBACK recv_callback;
} recv_t;
RCALLBACK send_callback;
};
// libevent -->


int epfd = 0;
struct conn_item connlist[1048576] = {<!-- -->0}; //Directly violently change the array to 100w
struct timeval zvoice_king;
//
// 1000000

#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)


int set_event(int fd, int event, int flag) {<!-- -->

if (flag) {<!-- --> // 1 add, 0 mod
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, & amp;ev);
} else {<!-- -->
\t
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, & amp;ev);
}

\t

}

int accept_cb(int fd) {<!-- -->

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
\t
int clientfd = accept(fd, (struct sockaddr*) & amp;clientaddr, & amp;len);
if (clientfd < 0) {<!-- -->
return -1;
}
set_event(clientfd, EPOLLIN, 1);

connlist[clientfd].fd = clientfd;
memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH);
connlist[clientfd].rlen = 0;
memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH);
connlist[clientfd].wlen = 0;
\t
connlist[clientfd].recv_t.recv_callback = recv_cb;
connlist[clientfd].send_callback = send_cb;

if ((clientfd % 1000) == 999) {<!-- -->
struct timeval tv_cur;
gettimeofday( & amp;tv_cur, NULL);
int time_used = TIME_SUB_MS(tv_cur, zvoice_king);//Calculate the time

memcpy( & amp;zvoice_king, & amp;tv_cur, sizeof(struct timeval));
\t\t
printf("clientfd : %d, time_used: %d\
", clientfd, time_used);
}

return clientfd;
}

int recv_cb(int fd) {<!-- --> // fd --> EPOLLIN

char *buffer = connlist[fd].rbuffer;
int idx = connlist[fd].rlen;
\t
int count = recv(fd, buffer + idx, BUFFER_LENGTH-idx, 0);
if (count == 0) {<!-- -->
printf("disconnect\
");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
\t\t
return -1;
}
connlist[fd].rlen + = count;
memcpy(connlist[fd].wbuffer, connlist[fd].rbuffer, connlist[fd].rlen);
connlist[fd].wlen = connlist[fd].rlen;
connlist[fd].rlen -= connlist[fd].rlen;
set_event(fd, EPOLLOUT, 0);

\t
return count;
}

int send_cb(int fd) {<!-- -->

char *buffer = connlist[fd].wbuffer;
int idx = connlist[fd].wlen;

int count = send(fd, buffer, idx, 0);

set_event(fd, EPOLLIN, 0);

return count;
}


int init_server(unsigned short port) {<!-- -->

int sockfd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in serveraddr;
memset( & amp;serveraddr, 0, sizeof(struct sockaddr_in));

serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(port);

if (-1 == bind(sockfd, (struct sockaddr*) & amp;serveraddr, sizeof(struct sockaddr))) {<!-- -->
perror("bind");
return -1;
}

listen(sockfd, 10);

return sockfd;
}

// tcp
int main() {<!-- -->

int port_count = 100;
unsigned short port = 2048;
int i = 0;

\t
epfd = epoll_create(1); // int size

for (i = 0;i < port_count;i + + ) {<!-- -->
int sockfd = init_server(port + i); // 2048, 2049, 2050, 2051 ... 2057
connlist[sockfd].fd = sockfd;
connlist[sockfd].recv_t.accept_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
}

gettimeofday( & amp;zvoice_king, NULL);

struct epoll_event events[1024] = {<!-- -->0};
\t
while (1) {<!-- --> // mainloop();

int nready = epoll_wait(epfd, events, 1024, -1); //

int i = 0;
for (i = 0;i < nready;i + + ) {<!-- -->

int connfd = events[i].data.fd;
if (events[i].events & amp; EPOLLIN) {<!-- --> //
int count = connlist[connfd].recv_t.recv_callback(connfd);
} else if (events[i].events & amp; EPOLLOUT) {<!-- -->
int count = connlist[connfd].send_callback(connfd);
}

}

}
}

Use this to test the connection code, and you can use it to test the number of concurrency on other servers.

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>


#defineMAX_BUFFER 128
#define MAX_EPOLLSIZE (384*1024)
#defineMAX_PORT 100

#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

int isContinue = 0;

static int ntySetNonblock(int fd) {<!-- -->
int flags;

flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) return -1;
return 0;
}

static int ntySetReUseAddr(int fd) {<!-- -->
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) & amp;reuse, sizeof(reuse));
}



int main(int argc, char **argv) {<!-- -->
if (argc <= 2) {<!-- -->
printf("Usage: %s ip port\
", argv[0]);
exit(0);
}

const char *ip = argv[1];
int port = atoi(argv[2]);
int connections = 0;
char buffer[128] = {<!-- -->0};
int i = 0, index = 0;

struct epoll_event events[MAX_EPOLLSIZE];
\t
int epoll_fd = epoll_create(MAX_EPOLLSIZE);
\t
strcpy(buffer, " Data From MulClient\
");
\t\t
struct sockaddr_in addr;
memset( & amp;addr, 0, sizeof(struct sockaddr_in));
\t
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);

struct timeval tv_begin;
gettimeofday( & amp;tv_begin, NULL);

while (1) {<!-- -->
if ( + + index >= MAX_PORT) index = 0;
\t\t
struct epoll_event ev;
int sockfd = 0;

if (connections < 380000 & amp; & amp; !isContinue) {<!-- -->
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {<!-- -->
perror("socket");
goto err;
}

//ntySetReUseAddr(sockfd);
addr.sin_port = htons(port + index);

if (connect(sockfd, (struct sockaddr*) & amp;addr, sizeof(struct sockaddr_in)) < 0) {<!-- -->
perror("connect");
goto err;
}
ntySetNonblock(sockfd);
ntySetReUseAddr(sockfd);

sprintf(buffer, "Hello Server: client --> %d\
", connections);
send(sockfd, buffer, strlen(buffer), 0);

ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLOUT;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, & amp;ev);
\t\t
connections + + ;
}
//connections + + ;
if (connections % 1000 == 999 || connections >= 380000) {<!-- -->
struct timeval tv_cur;
memcpy( & amp;tv_cur, & amp;tv_begin, sizeof(struct timeval));
\t\t\t
gettimeofday( & amp;tv_begin, NULL);

int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\
", connections, sockfd, time_used);

int nfds = epoll_wait(epoll_fd, events, connections, 100);
for (i = 0;i < nfds;i + + ) {<!-- -->
int clientfd = events[i].data.fd;

if (events[i].events & amp; EPOLLOUT) {<!-- -->
sprintf(buffer, "data from %d\
", clientfd);
send(sockfd, buffer, strlen(buffer), 0);
} else if (events[i].events & amp; EPOLLIN) {<!-- -->
char rBuffer[MAX_BUFFER] = {<!-- -->0};
ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
if (length > 0) {<!-- -->
printf("RecvBuffer:%s\
", rBuffer);

if (!strcmp(rBuffer, "quit")) {<!-- -->
isContinue = 0;
}
\t\t\t\t\t\t
} else if (length == 0) {<!-- -->
printf("Disconnect clientfd:%d\
", clientfd);
connections --;
close(clientfd);
} else {<!-- -->
if (errno == EINTR) continue;

printf("Error clientfd:%d, errno:%d\
", clientfd, errno);
close(clientfd);
}
} else {<!-- -->
printf(" clientfd:%d, errno:%d\
", clientfd, errno);
close(clientfd);
}
}
}

usleep(1 * 1000);
}

return 0;

err:
printf("error : %s\
", strerror(errno));
return 0;
\t
}

Virtual machine

We prepare three virtual machines for experiments. The configuration is as follows:

  • server: 8 core 8G
  • client: 2 core 4G
  • client: 2 core 4G
  • client: 2 core 4G

I am using ubuntu here, other distributions should be similar.

Parameter settings

Modify the number of open file handles

First we need to set the number of file handles that can be opened:

ulimit -n 1048576

Loading IP tracking module

modprobe ip_conntrack

Modify system settings

cd /etc
ls | grep sysctl.conf

If not found, create one

touch sysctl.conf

if found

sudo vim sysctl.conf

Add the following at the end:

server:

net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_mem = 1772864 1572864 1572864
net.ipv4.tcp_wmem = 512 512 1024
net.ipv4.tcp_rmem = 512 512 1024
fs.file-max = 1048576

client:

net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_mem = 262144 524288 786432
net.ipv4.tcp_wmem = 512 512 1024
net.ipv4.tcp_rmem = 512 512 1024

Then make them effective

sudo sysctl -p

Start testing

First, compile these two codes. Then ifconfig looks at the IP address.

The server runs the first piece of code, and the client runs the client code (the parameters are IP and port number).

Problems encountered

When using htop to monitor system resources. We will find that mem suddenly drops while running. It’s just a problem of not enough memory. Here you need to modify the allocation of the tcp protocol stack, which is the parameter of sysctl.conf. What I provide are the parameters I configured. Increasing tcp.mem can increase the memory allocation of the tcp protocol stack. Changing tcp_rmem tcp_wmem to a smaller value can modify the memory allocation for each tcp connection. If you still can’t run 1 million. To modify the client parameters. The final solution is to modify the large virtual memory allocation of the virtual machine.

The problem of data structure will be solved next time
Practice test of millions of concurrent connections 02