Linux reactor server model (including C language code)

The role of reactor

  1. Check if io is ready

  2. operation io

Working mechanism of reactor

  1. Create sockfd (lientfd), bind the port number, set the callback function of sockfd (lientfd) as acceptor, add epoll, and monitor whether there is a client (customer) connection on the port. (If there is, trigger the callback function acceptor, allocate clientfd, and set the callback function of clientfd to recv)

  2. Call epoll_wait() cyclically in main, and call the corresponding callback function (acceptor(), recv(), send()) according to different events of fd

 for (i = 0; i < nready; + + i) {
        if (ev & EPOLLIN) {
            sockfd read event accept() new connection arrives
            client receives recv()
        }
        if (ev & EPOLLOUT) {
            connectfd write event connection established successfully // This code does not reflect
            client sends send()
        }
    }

Data structure diagram of reactor

reactor code

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <fcntl.h>
#include <pthread.h>

#include <sys/poll.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH 1024
#define EVENT_LENGTH 1024

typedef int (*ZVCALLBACK)(int fd, int events, void *arg);


typedef struct zv_connect_s {
\t
int fd;

ZVCALLBACK cb; // callback function

int count;

char rbuffer[BUFFER_LENGTH];
int rc; // how much the current rbuffer occupies

char wbuffer[BUFFER_LENGTH];
int wc; // how much the current wbuffer occupies
\t
} zv_connect_t;

typedef struct zv_connblock_s {

zv_connect_t *block;
struct zv_connblock_s *next;

} zv_connblock_t;

typedef struct zv_reactor_s {

int epfd;
int blkcnt;

zv_connblock_t *blockheader;

} zv_reactor_t;


//Initialized three kinds of writing and sending
//zv_reactor_t* zv_init_reactor(void);
//zv_reactor_t* zv_init_reactor(zv_reactor_t * reactor);
int zv_init_reactor(zv_reactor_t *reactor) {

if (!reactor) return -1;
#if 0
reactor->blockheader = malloc(sizeof(zv_reactor_t));
if (reactor->blockheader == NULL) return -1;

reactor->epfd = epoll_create(1);
\t
reactor->blockheader->block = calloc(1024, sizeof(zv_connect_t));
if (reactor->blockheader->block == NULL) return -1;

#else

reactor->blockheader = malloc(sizeof(zv_reactor_t) + EVENT_LENGTH * sizeof(zv_connect_t));
if (reactor->blockheader == NULL) return -1;
\t
reactor->blockheader->block = (zv_connect_t*)(reactor->blockheader + 1);
reactor->blkcnt = 1;
reactor->blockheader->next = NULL;
\t
reactor->epfd = epoll_create(1);

#endif
return 0;
}

void zv_destory_reactor(zv_reactor_t *reactor) {
if (!reactor) return ;

if (reactor->blockheader) free(reactor->blockheader);

close(reactor->epfd);
\t
}

int zv_connect_block(zv_reactor_t *reactor) {

if (!reactor) return -1;

zv_connblock_t *blk = reactor->blockheader;

while (blk->next != NULL) blk = blk->next;

zv_connblock_t *connblock = malloc(sizeof(zv_reactor_t) + EVENT_LENGTH * sizeof(zv_connect_t));
if (connblock == NULL) return -1;

\t
//blk->next = connblock;
connblock->block = (zv_connect_t*)(connblock + 1);
connblock->next = NULL;

blk->next = connblock;
reactor->blkcnt + + ;

return 0;
}

zv_connect_t *zv_connect_idx(zv_reactor_t *reactor, int fd) {

if (!reactor) return NULL;
 
int blkidx = fd / EVENT_LENGTH;

while (blkidx >= reactor->blkcnt) {
zv_connect_block(reactor);
}
\t
int i = 0;
zv_connblock_t *blk = reactor->blockheader;
while (i + + < blkidx) {
blk = blk->next;
}

return & blk->block[fd % EVENT_LENGTH];

}


int init_server(short port) {

int sockfd = socket(AF_INET, SOCK_STREAM, 0); //io

struct sockaddr_in servaddr;
memset( & amp;servaddr, 0, sizeof(struct sockaddr_in)); // 192.168.1.123
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
servaddr. sin_port = htons(port);

if(-1 == bind(sockfd, (struct sockaddr*) &servaddr, sizeof(struct sockaddr))) {
printf(" bind failed: %s", strerror(errno));
return -1;
}
\t
listen(sockfd, 10);

printf("sockfd port: %d\\
", port);

return sockfd;
}

int recv_cb(int fd, int events, void * arg);

int send_cb(int fd, int events, void *arg) {

//printf("send_cb\\
");

zv_reactor_t *reactor = (zv_reactor_t*)arg;
zv_connect_t *conn = zv_connect_idx(reactor, fd);

send(fd, conn->wbuffer, conn->wc, 0);
\t
conn->cb = recv_cb;

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
\t

return 0;

}


int recv_cb(int fd, int events, void *arg) {
\t
zv_reactor_t *reactor = (zv_reactor_t*)arg;
zv_connect_t *conn = zv_connect_idx(reactor, fd);

int ret = recv(fd, conn->rbuffer + conn->rc, conn->count, 0);
if (ret < 0) {
printf("recv errno: %d\\
", errno);
return -1;
}
else if (ret == 0) { // When io is disconnected, make it unavailable
\t
conn->fd = -1;
conn->rc = 0;
conn->wc = 0;
\t\t
epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, fd, NULL);
\t\t
close(fd);
return -1;
}
\t
\t
//printf("rbuffer: %s, rc: %d\\
", conn->rbuffer, conn->rc);
\t
conn->rc += ret;
\t
memcpy(conn->wbuffer, conn->rbuffer, conn->rc);
//printf("rc: %d\\
", conn->rc);
conn->wc = conn->rc;
\t
\t

conn->cb = send_cb;
\t
struct epoll_event ev;
ev.events = EPOLLOUT;
ev.data.fd = fd;
epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);

return 0;
}


int accept_cb(int fd, int events, void *arg) {

//printf("accept_cb\\
");

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);

int clientfd = accept(fd, (struct sockaddr*) &clientaddr, &len);
if (clientfd < 0) {

printf("accept errno: %d\\
", errno);
return -1;
}

printf("clientfd: %d\\
", clientfd);

zv_reactor_t *reactor = (zv_reactor_t*)arg;
zv_connect_t *conn = zv_connect_idx(reactor, clientfd);;
\t
conn->fd = clientfd;
conn->cb = recv_cb;
conn->count = BUFFER_LENGTH;
\t\t
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);

return 0;
}



int set_listener(zv_reactor_t *reactor, int fd, ZVCALLBACK cb) {

if (!reactor || !reactor->blockheader) return -1;

reactor->blockheader->block[fd].fd = fd;
reactor->blockheader->block[fd].cb = cb;

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;

epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev);

return 0;
}

int main(int argc, char *argv[]) {

if (argc < 2) return -1;
\t
zv_reactor_t reactor;
zv_init_reactor( &reactor);
\t
int port = atoi(argv[1]);
\t
int i = 0;
for (i = 0; i < 100; + + i) {
int sockfd = init_server(port + i);
set_listener( & reactor, sockfd, accept_cb);
}
\t
struct epoll_event events[EVENT_LENGTH] = {0};

while (1) {

int nready = epoll_wait(reactor.epfd, events, EVENT_LENGTH, -1);

int i = 0;
for (i = 0; i < nready; + + i) {

int connfd = events[i].data.fd;
zv_connect_t *conn = zv_connect_idx( & amp;reactor, connfd); // equivalent to & amp;reactor.blockheader->block[connfd]
\t\t
if (events[i].events & EPOLLIN) {

conn->cb(connfd, events[i].events, &reactor);
}
if (events[i].events & EPOLLOUT) {

conn->cb(connfd, events[i].events, &reactor);
}
\t\t\t
}

}

return 0;

}

C/C ++ linux service period advanced architecture system tutorial learning “> article reference and C/C ++ linux service period advanced architecture system tutorial learning

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge