ProjectReactor mode server

Table of Contents

Reactor complete code connection

Prerequisite knowledge:

1. What are the problems with ordinary epoll reading and writing?

2.What is the callback function in Connection?

3. Server initialization (Connection is just a structure used)

4. Wait for the ready event: When the event is ready, call the corresponding callback method for different objects using Connection (encapsulated fd, callback method);

5._listenSock reading: How to handle the Accepter function to obtain new links

6. Ordinary socket reading: Recver

7. The concern for writing events is on-demand:

8.Execution effect:

9.Advantages of Reactor:


Reactor complete code connection

Prerequisite knowledge:

Reactor is called the reactor mode; reaction: handles ready events (read, write, exception);

We use epoll to implement. Select, poll, and epoll are the development history of multiplexing. Epoll improves the shortcomings of select and poll;

  • Programmers are required to maintain arrays. Select/poll all have this shortcoming.
  • There are a large number of traversals select/poll have this shortcoming
  • A large number of parameters are input and output parameters and need to be reset. Select has
  • There is an upper limit for managed FDs. Select has

1. What are the problems with ordinary epoll reading and writing?

  • A static array is used to read; if the message is long, the read will be an incomplete message; if the message is short, there may be multiple messages read at one time, and the last message may not be complete;
  • Such error messages cannot be analyzed and processed, and a response message cannot be constructed;

To sum up: The problem is that there is no guarantee that the complete message is read, resulting in the inability to analyze, process, and construct a response message;

void Read(int fd)
    {
        char buff[1024];
        ssize_t s = read(fd, buff, 1023);
        if(s > 0){
            buff[s] = 0;
            LOG2(INFO, buff, fd);
        }

Solution: Encapsulate the file descriptor and have a receiving and sending buffer, just use string;

  1. Use a static array to read, and then add it to the receiving buffer to save;
  2. After reading, analyze the receiving buffer to see if there is a complete message;
  3. Process the complete request message, construct a response message and add it to the sending buffer;
using func_t = std::function<void(Connection*)>;
using cals_t = std::function<void(std::string & amp;, Connection*)>;

class Connection{
public:
    Connection(int fd = -1):_fd(fd), _ts(nullptr)
    {}
    ~Connection()
    {
        if(_fd >= 0)
            close(_fd);
    }
public:
    int _fd;

    //Read and write exception callback method
    func_t _recver;
    func_t _sender;
    func_t _exception;

    //accept buffer
    std::string _outbuff;
    //send buffer
    std::string _inbuff;

    //TcpServer's back pointer, the concern for write events is to open on demand
    TcpServer *_ts;
    
    //Connect the time of the most recent active activity
    time_t _times;
};

2.What is the callback function in Connection

A wrapper; the return value is void and the parameter is Connection*, it can accept function pointers, functors, and lamada expressions;

using func_t = std::function<void(Connection*)>;

Advantage:

  • The reading method of _listenSock is accept new connection, usually it is read request message
  • Set the read and write exception callback method during initialization (callback: function executed using a function pointer). There is no need to judge whether it is _listenSock or a normal socket. Con->_recver is used uniformly;

3. Server initialization (Connection is just a structure used)

  1. Socket creation, bind, listening;
  2. Building the epoll model, the epoll function is also encapsulated, and _epfd is encapsulated in the Epoll class;
  3. Initialize_listenSock Connection structure;Read callback methodAccept is a class function with one more this pointer. You need to use std::bind to change the number of parameters and pass them to the wrapper strong>;
  4. epoll_wait’s event ready queue initialization;
class TcpServer{
    const static int gport = 8080;
    const static int gnum = 128;
    TcpServer(int port = gport, int num = gnum):_port(gport), _evts_num(num)
    {
        //Socket, create bind listener
        _listenSock = Sock::Socket();
        Sock::Bind(_listenSock,_port);
        Sock::Listen(_listenSock);

        //Build epoll model
        _epoll.CreateEpoll();

        //listensock adds epoll model and _connections management
        AddConnection(_listenSock, std::bind( & amp;TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);

        //epoll_wait ready queue, get ready events
        _evts = new epoll_event[_evts_num];
    }
    ~TcpServer()
    {
        if(_listenSock >= 0)
            close(_listenSock);
        if(_evts != nullptr)
            delete[] _evts;
        for(auto &pr : _connections)
        {
            _connections.erase(pr.first);
            delete pr.second;
        }
    }
private:
    int _listenSock;

    //epoll
    Epoll_epoll;
    //ready queue
    epoll_event* _evts;
    int _evts_num;

    //Manage connection object
    std::unordered_map<int, Connection*> _connections;
    int _port;

    //Callback pointer for business processing
    cals_t _cb;
};

4. Wait for the ready event: When the event is ready, call the corresponding callback method for different objects using Connection (encapsulated fd, callback method);

 void LoopOnce()
    {
        int n = _epoll.WaitEpoll(_evts, _evts_num);
        //There is an event ready
        if(n > 0)
        {
            //LOG2(INFO, "epoll wait success",fd);
            for(int i=0; i<n; i + + )
            {
                int fd = _evts[i].data.fd;
                int events = _evts->events;

                //The connection is closed or there is an error. Change to unified processing of reading and writing. If reading or writing fails, exception handling will be called;
                if( events & amp; EPOLLHUP){
                    LOG2(INFO,"Connection closed",fd);
                    events |= (EPOLLIN | EPOLLOUT);
                }
                if( events & EPOLLERR){
                    LOG2(INFO,"Error",fd);
                    events |= (EPOLLIN | EPOLLOUT);
                }

                if(_connections.count(fd) & amp; & amp; events & amp; EPOLLIN)
                {
                    if(IsConnectionExits(fd) & amp; & amp; _connections[fd]->_recver != nullptr){
                        _connections[fd]->_recver(_connections[fd]);
                    }
                }
                if(_connections.count(fd) & amp; & amp; events & amp; EPOLLOUT)
                {
                    if(IsConnectionExits(fd) & amp; & amp; _connections[fd]->_sender != nullptr)
                        _connections[fd]->_sender(_connections[fd]);
                }
            }
        }
        else if(n==0)
        {
            LOG(INFO, "timeout");
        }
        else
        {
            LOG(FATAL,"epoll wait fail");
            exit(4);
        }
    }
    void Dispatcher(cals_t cb)
    {
        _cb = cb;
        while(true)
        {
            //Remove inactive connections
            DeleteInactivity();
            LoopOnce();
        }
    }

5._listenSock reading: How to handle the Accepter function to obtain new links

  • Get a new connection, if the new fd is legal, Set the corresponding read and write exception callback method, read: read the request message, write: send the response message, exception: handle an error;
  • All sockets use ET mode (high notification efficiency, only supports non-blocking reading and writing), so EPOLLET should be set;
void Accepter(Connection * con)
    {
        while(true)
        {
            con->_times = time(nullptr);
            struct sockaddr_in tmp;
            socklen_t tlen = sizeof(tmp);
            int new_sock = accept(con->_fd, (struct sockaddr *) & amp;tmp, & amp;tlen);
            if(new_sock < 0)
            {
                //So the event is processed
                if(errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if(errno == EINTR)//may be interrupted by a signal, the probability is extremely small
                    continue;
                else
                {
                    std::cout << "accept fail , errno :" << errno << strerror(errno) << std::endl;
                    break;
                }
            }
            else//Add to epoll model and _connections management;
            {
                if(AddConnection(new_sock, std::bind( & amp;TcpServer::Recver, this, std::placeholders::_1),
                                std::bind( & amp;TcpServer::Sender, this, std::placeholders::_1),
                                std::bind( & amp;TcpServer::Exception, this, std::placeholders::_1)
                                ))
                    LOG2(INFO, "add connection success",new_sock);
                else
                    LOG2(RISK, "add connection fail", new_sock);
            }
        }
    }

6. Ordinary socket reading: Recver

  1. Keep reading until an error occurs or the reading is completed; The results of each read are placed in the receiving buffer;
  2. After reading, process the acceptance buffer, take out complete messages one by one, and perform business processing on the request message;
 void Recver(Connection *con)
    {
        con->_times = time(nullptr);
        const int buff_size = 1024;
        char buff[buff_size];
        while(true)
        {
            ssize_t s = recv(con->_fd, buff, buff_size - 1, 0);
            if(s > 0)
            {
                buff[s] = 0;
                con->_outbuff + = buff;
            }
            else if(s==0)
            {
                LOG2(INFO, "Writing end closed", con->_fd);
                con->_exception(con);
                return;
            }
            else
            {
                //reading completed
                if(errno == EAGAIN || errno == EWOULDBLOCK )
                {
                    LOG2(INFO, "Processing completed", con->_fd);
                    break;
                }
                else if(errno == EINTR)
                    continue;
                else
                {
                    LOG2(ERROR, "recv fail ,fd: ", con->_fd);
                    con->_exception(con);
                    return;
                }
            }
        }
        std::cout << "fd: " << con->_fd << "outbuff: " << con->_outbuff <<std::endl;
        //Process the complete message in outbuff
        std::vector<std::string> out;
        //Separate messages, the function is in protocol.hpp
        SplitMessage(out, con->_outbuff);
        for(auto &s : out)
            _cb(s, con);//Business logic callback pointer, in the main function

    }

7. The concern for writing events is on-demand:

  • If care is turned on and no data has been sent yet, the write event will always be ready; so care is required;
  • After the request message business is processed, the response message is constructed, there must be a response, and the concern for the write event is turned on;
  • This is also the reason why Connection encapsulates a Tcperver pointer. Here we turn on the concern of writing events;
//Business processing
void CalArguments(std::string & amp;str, Connection *con)
{
    //Request message deserialization
    Request req;
    //std::cout<<str <<std::endl;
    if(!req.Deserialize(str)){
        LOG2(ERROR, "deseroalize fail" ,con->_fd);
        return;
    }

    //process data
    Response res;
    calculator(req, res);

    //Response message serialization
    std::string s = res.Serialize();

    //Add to inbuff
    con->_inbuff + = s;

    //There must be a response message, open the relationship of writing events
    con->_ts->EnableReadWrite(con->_fd, true, true);
}

8. Execution effect:

  • The protocol I wrote was to add, subtract, multiply and divide any two numbers;
  • Each request or response uses x as a separator;

9. Advantages of Reactor:

Compare with processes/threads:

  • It is a single-process server that can process requests concurrently, which reduces the creation, destruction, and scheduling time compared to processes/threads;
  • It waits for a batch of fd, reducing the unit waiting time;One thread waits for one fd;
  • It has high reusability, just replace the business logic;