The role of reactor
-
Check if io is ready
-
operation io
Working mechanism of reactor
-
Create sockfd (lientfd), bind the port number, set the callback function of sockfd (lientfd) as acceptor, add epoll, and monitor whether there is a client (customer) connection on the port. (If there is, trigger the callback function acceptor, allocate clientfd, and set the callback function of clientfd to recv)
-
Call epoll_wait() cyclically in main, and call the corresponding callback function (acceptor(), recv(), send()) according to different events of fd
for (i = 0; i < nready; + + i) { if (ev & EPOLLIN) { sockfd read event accept() new connection arrives client receives recv() } if (ev & EPOLLOUT) { connectfd write event connection established successfully // This code does not reflect client sends send() } }
Data structure diagram of reactor
reactor code
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <fcntl.h> #include <pthread.h> #include <sys/poll.h> #include <sys/epoll.h> #define BUFFER_LENGTH 1024 #define EVENT_LENGTH 1024 typedef int (*ZVCALLBACK)(int fd, int events, void *arg); typedef struct zv_connect_s { \t int fd; ZVCALLBACK cb; // callback function int count; char rbuffer[BUFFER_LENGTH]; int rc; // how much the current rbuffer occupies char wbuffer[BUFFER_LENGTH]; int wc; // how much the current wbuffer occupies \t } zv_connect_t; typedef struct zv_connblock_s { zv_connect_t *block; struct zv_connblock_s *next; } zv_connblock_t; typedef struct zv_reactor_s { int epfd; int blkcnt; zv_connblock_t *blockheader; } zv_reactor_t; //Initialized three kinds of writing and sending //zv_reactor_t* zv_init_reactor(void); //zv_reactor_t* zv_init_reactor(zv_reactor_t * reactor); int zv_init_reactor(zv_reactor_t *reactor) { if (!reactor) return -1; #if 0 reactor->blockheader = malloc(sizeof(zv_reactor_t)); if (reactor->blockheader == NULL) return -1; reactor->epfd = epoll_create(1); \t reactor->blockheader->block = calloc(1024, sizeof(zv_connect_t)); if (reactor->blockheader->block == NULL) return -1; #else reactor->blockheader = malloc(sizeof(zv_reactor_t) + EVENT_LENGTH * sizeof(zv_connect_t)); if (reactor->blockheader == NULL) return -1; \t reactor->blockheader->block = (zv_connect_t*)(reactor->blockheader + 1); reactor->blkcnt = 1; reactor->blockheader->next = NULL; \t reactor->epfd = epoll_create(1); #endif return 0; } void zv_destory_reactor(zv_reactor_t *reactor) { if (!reactor) return ; if (reactor->blockheader) free(reactor->blockheader); close(reactor->epfd); \t } int zv_connect_block(zv_reactor_t *reactor) { if (!reactor) return -1; zv_connblock_t *blk = reactor->blockheader; while (blk->next != NULL) blk = blk->next; zv_connblock_t *connblock = malloc(sizeof(zv_reactor_t) + EVENT_LENGTH * sizeof(zv_connect_t)); if (connblock == NULL) return -1; \t //blk->next = connblock; connblock->block = (zv_connect_t*)(connblock + 1); connblock->next = NULL; blk->next = connblock; reactor->blkcnt + + ; return 0; } zv_connect_t *zv_connect_idx(zv_reactor_t *reactor, int fd) { if (!reactor) return NULL; int blkidx = fd / EVENT_LENGTH; while (blkidx >= reactor->blkcnt) { zv_connect_block(reactor); } \t int i = 0; zv_connblock_t *blk = reactor->blockheader; while (i + + < blkidx) { blk = blk->next; } return & blk->block[fd % EVENT_LENGTH]; } int init_server(short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); //io struct sockaddr_in servaddr; memset( & amp;servaddr, 0, sizeof(struct sockaddr_in)); // 192.168.1.123 servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 servaddr. sin_port = htons(port); if(-1 == bind(sockfd, (struct sockaddr*) &servaddr, sizeof(struct sockaddr))) { printf(" bind failed: %s", strerror(errno)); return -1; } \t listen(sockfd, 10); printf("sockfd port: %d\\ ", port); return sockfd; } int recv_cb(int fd, int events, void * arg); int send_cb(int fd, int events, void *arg) { //printf("send_cb\\ "); zv_reactor_t *reactor = (zv_reactor_t*)arg; zv_connect_t *conn = zv_connect_idx(reactor, fd); send(fd, conn->wbuffer, conn->wc, 0); \t conn->cb = recv_cb; struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); \t return 0; } int recv_cb(int fd, int events, void *arg) { \t zv_reactor_t *reactor = (zv_reactor_t*)arg; zv_connect_t *conn = zv_connect_idx(reactor, fd); int ret = recv(fd, conn->rbuffer + conn->rc, conn->count, 0); if (ret < 0) { printf("recv errno: %d\\ ", errno); return -1; } else if (ret == 0) { // When io is disconnected, make it unavailable \t conn->fd = -1; conn->rc = 0; conn->wc = 0; \t\t epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, fd, NULL); \t\t close(fd); return -1; } \t \t //printf("rbuffer: %s, rc: %d\\ ", conn->rbuffer, conn->rc); \t conn->rc += ret; \t memcpy(conn->wbuffer, conn->rbuffer, conn->rc); //printf("rc: %d\\ ", conn->rc); conn->wc = conn->rc; \t \t conn->cb = send_cb; \t struct epoll_event ev; ev.events = EPOLLOUT; ev.data.fd = fd; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); return 0; } int accept_cb(int fd, int events, void *arg) { //printf("accept_cb\\ "); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(fd, (struct sockaddr*) &clientaddr, &len); if (clientfd < 0) { printf("accept errno: %d\\ ", errno); return -1; } printf("clientfd: %d\\ ", clientfd); zv_reactor_t *reactor = (zv_reactor_t*)arg; zv_connect_t *conn = zv_connect_idx(reactor, clientfd);; \t conn->fd = clientfd; conn->cb = recv_cb; conn->count = BUFFER_LENGTH; \t\t struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev); return 0; } int set_listener(zv_reactor_t *reactor, int fd, ZVCALLBACK cb) { if (!reactor || !reactor->blockheader) return -1; reactor->blockheader->block[fd].fd = fd; reactor->blockheader->block[fd].cb = cb; struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev); return 0; } int main(int argc, char *argv[]) { if (argc < 2) return -1; \t zv_reactor_t reactor; zv_init_reactor( &reactor); \t int port = atoi(argv[1]); \t int i = 0; for (i = 0; i < 100; + + i) { int sockfd = init_server(port + i); set_listener( & reactor, sockfd, accept_cb); } \t struct epoll_event events[EVENT_LENGTH] = {0}; while (1) { int nready = epoll_wait(reactor.epfd, events, EVENT_LENGTH, -1); int i = 0; for (i = 0; i < nready; + + i) { int connfd = events[i].data.fd; zv_connect_t *conn = zv_connect_idx( & amp;reactor, connfd); // equivalent to & amp;reactor.blockheader->block[connfd] \t\t if (events[i].events & EPOLLIN) { conn->cb(connfd, events[i].events, &reactor); } if (events[i].events & EPOLLOUT) { conn->cb(connfd, events[i].events, &reactor); } \t\t\t } } return 0; }
C/C ++ linux service period advanced architecture system tutorial learning “> article reference and
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge