Linux c/c++ server development – network programming – reactor model

#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <fcntl.h>
#include <pthread.h>

#include <sys/poll.h>
#include <sys/epoll.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sendfile.h>

#define BUFFER_LENGTH 1024
#define EVENT_LENGTH 512

typedef int (*callback)(int fd, int events, void *arg);

typedef struct connect_s
{<!-- -->
int fd;
callback cb;
\t
char rbuffer[BUFFER_LENGTH];
int rc;
int count;
char wbuffer[BUFFER_LENGTH];
int wc;
\t
} connect_t;

// Each block stores 1024, if fd exceeds 1024, n=fd/1024
// Execute next several times when n is equal, and you can find the corresponding place
typedef struct connblock_s
{<!-- -->
connect_t *block;
struct connblock_s *next;
} connblock_t;


typedef struct reactor_s
{<!-- -->
int epfd;
int blkcnt;
\t
connblock_t *blockheader;
} reactor_t;

int init_reactor(reactor_t *reactor)
{<!-- -->
if (!reactor)
{<!-- -->
printf("err!reactor null\\
");
return -1;
}
\t
// Apply for memory together
reactor->blockheader = (connblock_t *)malloc(sizeof(connblock_t) + EVENT_LENGTH * sizeof(connect_t));
if (reactor->blockheader == NULL)
{<!-- -->
printf("malloc err!\\
");
return -1;
}
\t
reactor->blockheader->block = (connect_t*)(reactor->blockheader + 1); //Add a structure diagram size
reactor->blkcnt = 1;
reactor->blockheader->next = NULL;
\t
reactor->epfd = epoll_create(1);
}


void destroy_reactor(reactor_t *reactor)
{<!-- -->
if (!reactor) return ;
if (!reactor->blockheader) free(reactor->blockheader);
close(reactor->epfd);
}

int connect_block(reactor_t *reactor)
{<!-- -->
if (!reactor)
{<!-- -->
printf("err!reactor null\\
");
return -1;
}
\t
connblock_t *blk = reactor->blockheader;
\t
while (blk->next != NULL) //Apply for memory and insert it at the end of the linked list
blk = blk->next;
\t
connblock_t *connblock = (connblock_t *)malloc(sizeof(connblock_t) + EVENT_LENGTH * sizeof(connect_t));
if (connblock == NULL)
{<!-- -->
printf("malloc err!\\
");
return -1;
}
\t
connblock->block = (connect_t*)(connblock + 1);
connblock->next = NULL;
\t
blk->next = connblock;
reactor->blkcnt + + ;
\t
return 0;
}

// Find the first block
connect_t *connect_idx(reactor_t *reactor, int fd)
{<!-- -->
if (!reactor) return NULL;
\t
int blkidx = fd / EVENT_LENGTH;
\t
while (blkidx >= reactor->blkcnt)
{<!-- -->
connect_block(reactor);
}
\t
int i = 0;
connblock_t *blk = reactor->blockheader;
while (i + + < blkidx)
{<!-- -->
blk = blk->next;
}
\t
return & blk->block[fd % EVENT_LENGTH];
\t
}

int init_server(short port)
{<!-- -->
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
\t
struct sockaddr_in servaddr;
memset( &servaddr, 0, sizeof(struct sockaddr_in));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr. sin_port = htons(port);
\t
if (-1 == bind(sockfd, (struct sockaddr*) &servaddr, sizeof(struct sockaddr)))
{<!-- -->
printf("bind failed: %s", strerror(errno));
return -1;
}
\t
listen(sockfd, 10);
\t
printf("listen port: %d\\
", port);
\t
return sockfd;
}

int send_cb(int fd, int event, void *arg);

int recv_cb(int fd, int event, void *arg)
{<!-- -->
printf("recv_cb\\
");
reactor_t *reactor = (reactor_t*)arg;
connect_t *conn = connect_idx(reactor, fd);
\t
//int ret = recv(fd, conn->rbuffer + conn->rc, conn->count, 0);
int ret = recv(fd, conn->rbuffer, conn->count, 0);
if (ret < 0)
{<!-- -->
return -1;
}
else if (ret == 0)
{<!-- -->
conn->fd = -1;
conn->rc = 0;
conn->wc = 0;
\t\t
epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return -1;
}
else
{<!-- -->
conn->rc = ret; // + =
printf("rbuffer: %s, ret: %d\\
", conn->rbuffer, conn->rc);
\t\t
conn->cb = send_cb;
\t\t
struct epoll_event ev;
ev.events = EPOLLOUT;
ev.data.fd = fd;
epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
}
}

int send_cb(int fd, int event, void *arg)
{<!-- -->
printf("send_cb\\
");
reactor_t *reactor = (reactor_t*)arg;
connect_t *conn = connect_idx(reactor, fd);
\t// echo
memcpy(conn->wbuffer, conn->rbuffer, conn->rc);
conn->wc = conn->rc;
send(fd, conn->wbuffer, conn->wc, 0);
printf("wbuffer: %s, ret: %d\\
", conn->wbuffer, conn->wc);
\t
conn->cb = recv_cb;
\t
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);

}

int accept_cb(int fd, int events, void *arg)
{<!-- -->
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
\t
int clientfd = accept(fd, (struct sockaddr*) &clientaddr, &len);
if (clientfd < 0)
{<!-- -->
printf("accept errno: %d\\
", errno);
return -1;
}
\t
printf("clientfd: %d\\
", clientfd);
\t
reactor_t *reactor = (reactor_t*)arg;
connect_t *conn = connect_idx(reactor, clientfd);

conn->fd = clientfd;
conn->cb = recv_cb;
conn->count = BUFFER_LENGTH;
\t
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);

}

int set_listener(reactor_t *reactor, int fd, callback cb)
{<!-- -->
if (!reactor || !reactor->blockheader) return -1;
\t
reactor->blockheader->block[fd].fd = fd;
reactor->blockheader->block[fd].cb = cb;
\t
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
\t
epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev);
\t
}


int main(int argc, char *argv[])
{<!-- -->
if (argc < 2) return -1;
int port = atoi(argv[1]);
\t
reactor_t reactor;
init_reactor( &reactor);
\t
for (int i = 0;i < 1;i ++ )
{<!-- -->
int sockfd = init_server(port + i);
set_listener( & reactor, sockfd, accept_cb);
}
\t
struct epoll_event events[EVENT_LENGTH] = {<!-- -->0};
while (1) //mainloop, event driver
{<!-- -->
int nready = epoll_wait(reactor.epfd, events, EVENT_LENGTH, -1);
for (int i = 0;i < nready;i ++ )
{<!-- -->
int connfd = events[i].data.fd;
connect_t *conn = connect_idx( & reactor, connfd);
\t\t\t
if (events[i].events & EPOLLIN)
{<!-- -->
conn->cb(connfd, events[i].events, &reactor);
}
\t\t\t
if (events[i].events & EPOLLOUT)
{<!-- -->
conn->cb(connfd, events[i].events, &reactor);
}
}
}
}