#include <stdio.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <fcntl.h> #include <pthread.h> #include <sys/poll.h> #include <sys/epoll.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/sendfile.h> #define BUFFER_LENGTH 1024 #define EVENT_LENGTH 512 typedef int (*callback)(int fd, int events, void *arg); typedef struct connect_s {<!-- --> int fd; callback cb; \t char rbuffer[BUFFER_LENGTH]; int rc; int count; char wbuffer[BUFFER_LENGTH]; int wc; \t } connect_t; // Each block stores 1024, if fd exceeds 1024, n=fd/1024 // Execute next several times when n is equal, and you can find the corresponding place typedef struct connblock_s {<!-- --> connect_t *block; struct connblock_s *next; } connblock_t; typedef struct reactor_s {<!-- --> int epfd; int blkcnt; \t connblock_t *blockheader; } reactor_t; int init_reactor(reactor_t *reactor) {<!-- --> if (!reactor) {<!-- --> printf("err!reactor null\\ "); return -1; } \t // Apply for memory together reactor->blockheader = (connblock_t *)malloc(sizeof(connblock_t) + EVENT_LENGTH * sizeof(connect_t)); if (reactor->blockheader == NULL) {<!-- --> printf("malloc err!\\ "); return -1; } \t reactor->blockheader->block = (connect_t*)(reactor->blockheader + 1); //Add a structure diagram size reactor->blkcnt = 1; reactor->blockheader->next = NULL; \t reactor->epfd = epoll_create(1); } void destroy_reactor(reactor_t *reactor) {<!-- --> if (!reactor) return ; if (!reactor->blockheader) free(reactor->blockheader); close(reactor->epfd); } int connect_block(reactor_t *reactor) {<!-- --> if (!reactor) {<!-- --> printf("err!reactor null\\ "); return -1; } \t connblock_t *blk = reactor->blockheader; \t while (blk->next != NULL) //Apply for memory and insert it at the end of the linked list blk = blk->next; \t connblock_t *connblock = (connblock_t *)malloc(sizeof(connblock_t) + EVENT_LENGTH * sizeof(connect_t)); if (connblock == NULL) {<!-- --> printf("malloc err!\\ "); return -1; } \t connblock->block = (connect_t*)(connblock + 1); connblock->next = NULL; \t blk->next = connblock; reactor->blkcnt + + ; \t return 0; } // Find the first block connect_t *connect_idx(reactor_t *reactor, int fd) {<!-- --> if (!reactor) return NULL; \t int blkidx = fd / EVENT_LENGTH; \t while (blkidx >= reactor->blkcnt) {<!-- --> connect_block(reactor); } \t int i = 0; connblock_t *blk = reactor->blockheader; while (i + + < blkidx) {<!-- --> blk = blk->next; } \t return & blk->block[fd % EVENT_LENGTH]; \t } int init_server(short port) {<!-- --> int sockfd = socket(AF_INET, SOCK_STREAM, 0); \t struct sockaddr_in servaddr; memset( &servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr. sin_port = htons(port); \t if (-1 == bind(sockfd, (struct sockaddr*) &servaddr, sizeof(struct sockaddr))) {<!-- --> printf("bind failed: %s", strerror(errno)); return -1; } \t listen(sockfd, 10); \t printf("listen port: %d\\ ", port); \t return sockfd; } int send_cb(int fd, int event, void *arg); int recv_cb(int fd, int event, void *arg) {<!-- --> printf("recv_cb\\ "); reactor_t *reactor = (reactor_t*)arg; connect_t *conn = connect_idx(reactor, fd); \t //int ret = recv(fd, conn->rbuffer + conn->rc, conn->count, 0); int ret = recv(fd, conn->rbuffer, conn->count, 0); if (ret < 0) {<!-- --> return -1; } else if (ret == 0) {<!-- --> conn->fd = -1; conn->rc = 0; conn->wc = 0; \t\t epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); return -1; } else {<!-- --> conn->rc = ret; // + = printf("rbuffer: %s, ret: %d\\ ", conn->rbuffer, conn->rc); \t\t conn->cb = send_cb; \t\t struct epoll_event ev; ev.events = EPOLLOUT; ev.data.fd = fd; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); } } int send_cb(int fd, int event, void *arg) {<!-- --> printf("send_cb\\ "); reactor_t *reactor = (reactor_t*)arg; connect_t *conn = connect_idx(reactor, fd); \t// echo memcpy(conn->wbuffer, conn->rbuffer, conn->rc); conn->wc = conn->rc; send(fd, conn->wbuffer, conn->wc, 0); printf("wbuffer: %s, ret: %d\\ ", conn->wbuffer, conn->wc); \t conn->cb = recv_cb; \t struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); } int accept_cb(int fd, int events, void *arg) {<!-- --> struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); \t int clientfd = accept(fd, (struct sockaddr*) &clientaddr, &len); if (clientfd < 0) {<!-- --> printf("accept errno: %d\\ ", errno); return -1; } \t printf("clientfd: %d\\ ", clientfd); \t reactor_t *reactor = (reactor_t*)arg; connect_t *conn = connect_idx(reactor, clientfd); conn->fd = clientfd; conn->cb = recv_cb; conn->count = BUFFER_LENGTH; \t struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev); } int set_listener(reactor_t *reactor, int fd, callback cb) {<!-- --> if (!reactor || !reactor->blockheader) return -1; \t reactor->blockheader->block[fd].fd = fd; reactor->blockheader->block[fd].cb = cb; \t struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; \t epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev); \t } int main(int argc, char *argv[]) {<!-- --> if (argc < 2) return -1; int port = atoi(argv[1]); \t reactor_t reactor; init_reactor( &reactor); \t for (int i = 0;i < 1;i ++ ) {<!-- --> int sockfd = init_server(port + i); set_listener( & reactor, sockfd, accept_cb); } \t struct epoll_event events[EVENT_LENGTH] = {<!-- -->0}; while (1) //mainloop, event driver {<!-- --> int nready = epoll_wait(reactor.epfd, events, EVENT_LENGTH, -1); for (int i = 0;i < nready;i ++ ) {<!-- --> int connfd = events[i].data.fd; connect_t *conn = connect_idx( & reactor, connfd); \t\t\t if (events[i].events & EPOLLIN) {<!-- --> conn->cb(connfd, events[i].events, &reactor); } \t\t\t if (events[i].events & EPOLLOUT) {<!-- --> conn->cb(connfd, events[i].events, &reactor); } } } }