Table of Contents
Reactor complete code connection
Prerequisite knowledge:
1. What are the problems with ordinary epoll reading and writing?
2.What is the callback function in Connection?
3. Server initialization (Connection is just a structure used)
4. Wait for the ready event: When the event is ready, call the corresponding callback method for different objects using Connection (encapsulated fd, callback method);
5._listenSock reading: How to handle the Accepter function to obtain new links
6. Ordinary socket reading: Recver
7. The concern for writing events is on-demand:
8.Execution effect:
9.Advantages of Reactor:
Reactor complete code connection
Prerequisite knowledge:
Reactor is called the reactor mode; reaction: handles ready events (read, write, exception);
We use epoll to implement. Select, poll, and epoll are the development history of multiplexing. Epoll improves the shortcomings of select and poll;
- Programmers are required to maintain arrays. Select/poll all have this shortcoming.
- There are a large number of traversals select/poll have this shortcoming
- A large number of parameters are input and output parameters and need to be reset. Select has
- There is an upper limit for managed FDs. Select has
1. What are the problems with ordinary epoll reading and writing?
- A static array is used to read; if the message is long, the read will be an incomplete message; if the message is short, there may be multiple messages read at one time, and the last message may not be complete;
- Such error messages cannot be analyzed and processed, and a response message cannot be constructed;
To sum up: The problem is that there is no guarantee that the complete message is read, resulting in the inability to analyze, process, and construct a response message;
void Read(int fd) { char buff[1024]; ssize_t s = read(fd, buff, 1023); if(s > 0){ buff[s] = 0; LOG2(INFO, buff, fd); }
Solution: Encapsulate the file descriptor and have a receiving and sending buffer, just use string;
- Use a static array to read, and then add it to the receiving buffer to save;
- After reading, analyze the receiving buffer to see if there is a complete message;
- Process the complete request message, construct a response message and add it to the sending buffer;
using func_t = std::function<void(Connection*)>; using cals_t = std::function<void(std::string & amp;, Connection*)>; class Connection{ public: Connection(int fd = -1):_fd(fd), _ts(nullptr) {} ~Connection() { if(_fd >= 0) close(_fd); } public: int _fd; //Read and write exception callback method func_t _recver; func_t _sender; func_t _exception; //accept buffer std::string _outbuff; //send buffer std::string _inbuff; //TcpServer's back pointer, the concern for write events is to open on demand TcpServer *_ts; //Connect the time of the most recent active activity time_t _times; };
2.What is the callback function in Connection
A wrapper; the return value is void and the parameter is Connection*, it can accept function pointers, functors, and lamada expressions;
using func_t = std::function<void(Connection*)>;
Advantage:
- The reading method of _listenSock is accept new connection, usually it is read request message
- Set the read and write exception callback method during initialization (callback: function executed using a function pointer). There is no need to judge whether it is _listenSock or a normal socket. Con->_recver is used uniformly;
3. Server initialization (Connection is just a structure used)
- Socket creation, bind, listening;
- Building the epoll model, the epoll function is also encapsulated, and _epfd is encapsulated in the Epoll class;
- Initialize_listenSock Connection structure;Read callback methodAccept is a class function with one more this pointer. You need to use std::bind to change the number of parameters and pass them to the wrapper strong>;
- epoll_wait’s event ready queue initialization;
class TcpServer{ const static int gport = 8080; const static int gnum = 128; TcpServer(int port = gport, int num = gnum):_port(gport), _evts_num(num) { //Socket, create bind listener _listenSock = Sock::Socket(); Sock::Bind(_listenSock,_port); Sock::Listen(_listenSock); //Build epoll model _epoll.CreateEpoll(); //listensock adds epoll model and _connections management AddConnection(_listenSock, std::bind( & amp;TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); //epoll_wait ready queue, get ready events _evts = new epoll_event[_evts_num]; } ~TcpServer() { if(_listenSock >= 0) close(_listenSock); if(_evts != nullptr) delete[] _evts; for(auto &pr : _connections) { _connections.erase(pr.first); delete pr.second; } } private: int _listenSock; //epoll Epoll_epoll; //ready queue epoll_event* _evts; int _evts_num; //Manage connection object std::unordered_map<int, Connection*> _connections; int _port; //Callback pointer for business processing cals_t _cb; };
4. Wait for the ready event: When the event is ready, call the corresponding callback method for different objects using Connection (encapsulated fd, callback method);
void LoopOnce() { int n = _epoll.WaitEpoll(_evts, _evts_num); //There is an event ready if(n > 0) { //LOG2(INFO, "epoll wait success",fd); for(int i=0; i<n; i + + ) { int fd = _evts[i].data.fd; int events = _evts->events; //The connection is closed or there is an error. Change to unified processing of reading and writing. If reading or writing fails, exception handling will be called; if( events & amp; EPOLLHUP){ LOG2(INFO,"Connection closed",fd); events |= (EPOLLIN | EPOLLOUT); } if( events & EPOLLERR){ LOG2(INFO,"Error",fd); events |= (EPOLLIN | EPOLLOUT); } if(_connections.count(fd) & amp; & amp; events & amp; EPOLLIN) { if(IsConnectionExits(fd) & amp; & amp; _connections[fd]->_recver != nullptr){ _connections[fd]->_recver(_connections[fd]); } } if(_connections.count(fd) & amp; & amp; events & amp; EPOLLOUT) { if(IsConnectionExits(fd) & amp; & amp; _connections[fd]->_sender != nullptr) _connections[fd]->_sender(_connections[fd]); } } } else if(n==0) { LOG(INFO, "timeout"); } else { LOG(FATAL,"epoll wait fail"); exit(4); } } void Dispatcher(cals_t cb) { _cb = cb; while(true) { //Remove inactive connections DeleteInactivity(); LoopOnce(); } }
5._listenSock reading: How to handle the Accepter function to obtain new links
- Get a new connection, if the new fd is legal, Set the corresponding read and write exception callback method, read: read the request message, write: send the response message, exception: handle an error;
- All sockets use ET mode (high notification efficiency, only supports non-blocking reading and writing), so EPOLLET should be set;
void Accepter(Connection * con) { while(true) { con->_times = time(nullptr); struct sockaddr_in tmp; socklen_t tlen = sizeof(tmp); int new_sock = accept(con->_fd, (struct sockaddr *) & amp;tmp, & amp;tlen); if(new_sock < 0) { //So the event is processed if(errno == EAGAIN || errno == EWOULDBLOCK) break; else if(errno == EINTR)//may be interrupted by a signal, the probability is extremely small continue; else { std::cout << "accept fail , errno :" << errno << strerror(errno) << std::endl; break; } } else//Add to epoll model and _connections management; { if(AddConnection(new_sock, std::bind( & amp;TcpServer::Recver, this, std::placeholders::_1), std::bind( & amp;TcpServer::Sender, this, std::placeholders::_1), std::bind( & amp;TcpServer::Exception, this, std::placeholders::_1) )) LOG2(INFO, "add connection success",new_sock); else LOG2(RISK, "add connection fail", new_sock); } } }
6. Ordinary socket reading: Recver
- Keep reading until an error occurs or the reading is completed; The results of each read are placed in the receiving buffer;
- After reading, process the acceptance buffer, take out complete messages one by one, and perform business processing on the request message;
void Recver(Connection *con) { con->_times = time(nullptr); const int buff_size = 1024; char buff[buff_size]; while(true) { ssize_t s = recv(con->_fd, buff, buff_size - 1, 0); if(s > 0) { buff[s] = 0; con->_outbuff + = buff; } else if(s==0) { LOG2(INFO, "Writing end closed", con->_fd); con->_exception(con); return; } else { //reading completed if(errno == EAGAIN || errno == EWOULDBLOCK ) { LOG2(INFO, "Processing completed", con->_fd); break; } else if(errno == EINTR) continue; else { LOG2(ERROR, "recv fail ,fd: ", con->_fd); con->_exception(con); return; } } } std::cout << "fd: " << con->_fd << "outbuff: " << con->_outbuff <<std::endl; //Process the complete message in outbuff std::vector<std::string> out; //Separate messages, the function is in protocol.hpp SplitMessage(out, con->_outbuff); for(auto &s : out) _cb(s, con);//Business logic callback pointer, in the main function }
7. The concern for writing events is on-demand:
- If care is turned on and no data has been sent yet, the write event will always be ready; so care is required;
- After the request message business is processed, the response message is constructed, there must be a response, and the concern for the write event is turned on;
- This is also the reason why Connection encapsulates a Tcperver pointer. Here we turn on the concern of writing events;
//Business processing void CalArguments(std::string & amp;str, Connection *con) { //Request message deserialization Request req; //std::cout<<str <<std::endl; if(!req.Deserialize(str)){ LOG2(ERROR, "deseroalize fail" ,con->_fd); return; } //process data Response res; calculator(req, res); //Response message serialization std::string s = res.Serialize(); //Add to inbuff con->_inbuff + = s; //There must be a response message, open the relationship of writing events con->_ts->EnableReadWrite(con->_fd, true, true); }
8. Execution effect:
- The protocol I wrote was to add, subtract, multiply and divide any two numbers;
- Each request or response uses x as a separator;
9. Advantages of Reactor:
Compare with processes/threads:
- It is a single-process server that can process requests concurrently, which reduces the creation, destruction, and scheduling time compared to processes/threads;
- It waits for a batch of fd, reducing the unit waiting time;One thread waits for one fd;
- It has high reusability, just replace the business logic;