Server network model (3) reactor

Article directory

    • Background
    • Reactor
      • Single reactor single thread/process
      • Single reactor multithreading
      • Multiple reactors and multiple threads
    • example

Generate background

Through the previous study, we know that the server can use IO multiplexing technology such as epoll to achieve high concurrency when processing a large number of connections. However, during use, it is found that it is not very convenient to use IO multiplexing technology through process-oriented programming. Therefore, a mode based on object-oriented thinking to encapsulate the network IO layer, the Reactor mode, is produced.

Reactor

Reactor is translated as a reactor, which is a bit difficult to understand. In fact, it only has a specific response to events. The Reactor mode also has a name Dispatcher, which is relatively more appropriate, the allocator. Use epoll to listen to events, and distribute according to the type after receiving the event.

Reactor is responsible for event monitoring and distribution. There are generally two types of events, one is to establish a connection, and the other is to read and write data.

  • To establish a connection, we generally hand it over to an Acceptor to handle the reward connection and then exchange the Reactor to listen to the new socekt
  • Data reading and writing are generally handed over to a Handler, and the general process is read->processing business->write

There are many ways to organize the Reactor model, and the number of reactors and handlers can be divided into

  • Single reactor single thread/process
  • Single reactor multithread/process
  • Multi-reactor multi-thread/process
    Theoretically, there are multiple reactors and single processes, but compared with single reactors and single processes, the processing efficiency has not improved except for the complexity

Single reactor single thread/process

The implementation is simple, and there is no need to consider the communication competition between threads, which is suitable for scenarios with fast business processing.
shortcoming:

  • Event monitoring, connection establishment, and business processing are all in one thread. If the business processing takes a long time, other functions will be blocked.
  • Unable to take advantage of multi-core cpu performance.

Single reactor multi-thread

The process of listening to events and establishing a connection is the same as that of a single thread, but there are differences in the processing of the handler. The handler is only responsible for reading and writing data, while business processing is handled by other worker threads. One implementation method is that the handler sends the data to the message queue of the worker thread after reading the data. The worker thread will continuously read the data in the message queue and then process it. After the processing is completed, the result is returned to the handler, and then sent through send.

  • Because of the use of multi-threading, the resources of multi-core cpu can be fully utilized. At the same time, inter-thread communication needs to be locked on shared resources, such as the cache where the handler saves the calculation results.
  • In single-reactor mode, a reactor listens to all events, and there may be a performance bottleneck.
  • As for the implementation of single reactor and multiple processes, it is a lot of trouble, mainly because the implementation of inter-process communication is much more difficult.

Multi-reactor multi-thread

In multi-reactor mode, master-slave reactors are generally set. The master reactor only pays attention to listenfd, listens to the event of establishing a connection, and then hands it over to accpeter for processing. After the connection is established, the sub-reactor is responsible for the monitoring of read and write events. The sub-reactor and the corresponding handler are in the same thread. When new read and write events occur, they are handed over to the corresponding handler to complete the corresponding business.

Example

EventSocket manages a connected data corresponding to a client, and implements basic read and write functions.

class EventSocket
{<!-- -->
    public:
        EventSocket():
            fd_(0),
            status_(0),
            length_(0)
        {<!-- -->}
        ~EventSocket()
        {<!-- -->}

        int getFd() {<!-- -->return fd_;}
        void setFd(int fd) {<!-- -->fd_ = fd;}
        int getStatus() {<!-- -->return status_;}
        void setStatus(int status) {<!-- -->status_ = status;}


        bool readData();
        bool writeData();
    private:
        int fd_;
        int status_; // 0 none 1 listening 2 common
        int length_;
        int buffer_[BUFF_LENGTH];
};

EventSocketGroup is used to manage all connections and is responsible for connection creation and destruction. Here, for the convenience of directly using map management, a better approach can be to use pooled counting.

class EventSocketGroup
{<!-- -->
    public:
        EventSocketGroup():
            reactor_(NULL)
        {<!-- -->}

        ~EventSocketGroup()
        {<!-- -->}
        void setReactor(EpollReactor* reactor) {<!-- -->reactor_ = reactor;}

        EventSocket* getSocket(int socket_fd);

        bool onFDRead(EventSocket* socket_ptr);
        bool onFDWrite(EventSocket* socket_ptr);
        void closeSocket(EventSocket* socket_ptr);
    private:
        std::map<int, EventSocket> socket_map_;
        EpollReactor* reactor_;
};

EpollHandler specifies the callback interface for epoll events.
EpollReactor manages epoll objects, and realizes adding, deleting and modifying functions that encapsulate epoll events. The execute function implements the main logic of epoll_wait, and calls back the function registered in EpollHandler when an event occurs.

class EpollHandler
{<!-- -->
    public:
        typedef std::function<int()> AcceptCallback;
        typedef std::function<bool(EventSocket* socket_ptr)> ReadCallBack;
        typedef std::function<bool(EventSocket* socket_ptr)> WriteCallBack;
        typedef std::function<void(EventSocket* socket_ptr)> CloseCallBack;

        void setAcceptCB(const AcceptCallback & cb) {<!-- --> accept_cb_ = cb;}
        void setReadCB(const ReadCallBack & amp; cb) {<!-- -->read_cb_ = cb;}
        void setWriteCB(const WriteCallBack & amp; cb) {<!-- -->write_cb_ = cb;}
        void setCloseCB(const CloseCallBack & amp; cb) {<!-- -->close_cb_ = cb;}

    public:
        AcceptCallback accept_cb_;
        ReadCallBack read_cb_;
        WriteCallBack write_cb_;
        CloseCallBack close_cb_;
};

class Epoll Reactor
{<!-- -->
    public:
        EpollReactor(): epfd_(0)
        {<!-- -->}
        ~EpollReactor()
        {<!-- -->}
        bool init();
        bool addEvent(int events, EventSocket* socket_ptr);
        bool modEvent(int events, EventSocket* socket_ptr);
        void delEvent(EventSocket* socket_ptr);
        void execute();

        EpollHandler & amp; getHandler() {<!-- --> return handler_;}
    private:
        int epfd_;
        struct epoll_event events_list_[MAX_EPOLL_EVENTS + 1];
        EpollHandler handler_;
};

EpollAcceptor is responsible for maintaining the listening socket and registering with Reactor for monitoring when a new connection is added.

class EpollAcceptor
{<!-- -->
    public:
        EpollAcceptor():
            listenfd_(0),
            reactor_(NULL),
            group_(NULL)
        {<!-- -->}
        ~EpollAcceptor() {<!-- -->}
        void setReactor(EpollReactor* reactor) {<!-- -->reactor_ = reactor;}
        void setSocketGroup(EventSocketGroup* group) {<!-- -->group_ = group;}
        bool startListen(int port);
        int onFDAccept();

    private:
        int listenfd_;
        EpollReactor* reactor_;
        EventSocketGroup* group_;
};

TcpServer is responsible for the initialization of all modules and process management.
init initializes each module and registers callback functions for various events.
start starts listening on the port for connections.
run repeatedly calls epoll_wait in a loop to listen to and process events
stop is responsible for resource recycling. In order to demonstrate how the reactor is built, the example does not set the server stop method, which will be omitted later.

class TcpServer
{<!-- -->
    public:
        TcpServer():
            port_(0)
        {<!-- -->}
        ~TcpServer()
        {<!-- -->}
    public:
        bool init(int port);
        bool start();
        void run();
        void stop();

    private:
        int port_;
        EpollReactor reactor_;
        EpollAcceptor acceptor_;
        EventSocketGroup group_;
};

complete example

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 


#define BUFF_LENGTH 1024
#define MAX_EPOLL_EVENTS 1024

using namespace std;

class EventSocketGroup;

class EventSocket
{<!-- -->
    public:
        EventSocket():
            fd_(0),
            status_(0),
            length_(0)
        {<!-- -->}
        ~EventSocket()
        {<!-- -->}

        int getFd() {<!-- -->return fd_;}
        void setFd(int fd) {<!-- -->fd_ = fd;}
        int getStatus() {<!-- -->return status_;}
        void setStatus(int status) {<!-- -->status_ = status;}


        bool readData();
        bool writeData();
    private:
        int fd_;
        int status_; // 0 none 1 listening 2 common
        int length_;
        int buffer_[BUFF_LENGTH];
};


bool EventSocket::readData()
{
    int len = recv(fd_, buffer_, BUFF_LENGTH, 0);
    if (len > 0)
    {
        length_ = len;
        buffer_[len] = '\0';
        printf("recv [fd=%d] %d:%s\\
", fd_, len, buffer_);
    }
    else if (len == 0)
    {
        printf("disconnected [%d]\\
", fd_);
        return false;
    }
    else
    {
        if (errno == EAGAIN)
        {
            // finished reading
            return true;
        }
        printf("read [fd=%d] error. %s\\
", fd_, strerror(errno));
        return false;
    }
    return true;
}

bool EventSocket::writeData()
{
    printf("write fd:%d \\
", fd_);

    int len = send(fd_, buffer_, length_, 0);
    if (len > 0)
    {
        printf("send [fd=%d] len%d:%s\\
", fd_, len, buffer_);
    }
    else
    {
        printf("send [fd=%d] error. %s\\
", fd_, strerror(errno));
        return false;
    }
    return true;
}



class EpollHandler
{<!-- -->
    public:
        typedef std::function<int()> AcceptCallback;
        typedef std::function<bool(EventSocket* socket_ptr)> ReadCallBack;
        typedef std::function<bool(EventSocket* socket_ptr)> WriteCallBack;
        typedef std::function<void(EventSocket* socket_ptr)> CloseCallBack;

        void setAcceptCB(const AcceptCallback & cb) {<!-- --> accept_cb_ = cb;}
        void setReadCB(const ReadCallBack & amp; cb) {<!-- -->read_cb_ = cb;}
        void setWriteCB(const WriteCallBack & amp; cb) {<!-- -->write_cb_ = cb;}
        void setCloseCB(const CloseCallBack & amp; cb) {<!-- -->close_cb_ = cb;}

    public:
        AcceptCallback accept_cb_;
        ReadCallBack read_cb_;
        WriteCallBack write_cb_;
        CloseCallBack close_cb_;
};

class Epoll Reactor
{<!-- -->
    public:
        EpollReactor(): epfd_(0)
        {<!-- -->}
        ~EpollReactor()
        {<!-- -->}
        bool init();
        bool addEvent(int events, EventSocket* socket_ptr);
        bool modEvent(int events, EventSocket* socket_ptr);
        void delEvent(EventSocket* socket_ptr);
        void execute();

        EpollHandler & amp; getHandler() {<!-- --> return handler_;}
    private:
        int epfd_;
        struct epoll_event events_list_[MAX_EPOLL_EVENTS + 1];
        EpollHandler handler_;
};

bool EpollReactor::init()
{
    epfd_ = epoll_create(1);
    if (epfd_ <= 0)
    {
        printf("create epfd error: %s \\
", strerror(errno));
        return false;
    }
    printf("init succ: %d \\
", epfd_);
    return true;
}

void EpollReactor::delEvent(EventSocket* socket_ptr)
{
    struct epoll_event ep_ev = {0, {0}};
    ep_ev.data.ptr = (void*)socket_ptr;
    socket_ptr->setStatus(0);
    epoll_ctl(epfd_, EPOLL_CTL_DEL, socket_ptr->getFd(), &ep_ev) ;
}


bool EpollReactor::addEvent(int events, EventSocket* socket_ptr)
{
    struct epoll_event ep_ev = {0, {0}};
    ep_ev.data.ptr = (void*)socket_ptr;
    ep_ev.events = events;
    if (epoll_ctl(epfd_, EPOLL_CTL_ADD, socket_ptr->getFd(), &ep_ev) < 0)
    {
        printf("event add failed. fd=%d, events[%d] \\
", socket_ptr->getFd(), events);
        return false;
    }
    return true;
}

bool EpollReactor::modEvent(int events, EventSocket* socket_ptr)
{
    struct epoll_event ep_ev = {0, {0}};
    ep_ev.data.ptr = (void*)socket_ptr;
    ep_ev.events = events;
    if (epoll_ctl(epfd_, EPOLL_CTL_MOD, socket_ptr->getFd(), &ep_ev) < 0)
    {
        printf("event add failed. fd=%d, events[%d] \\
", socket_ptr->getFd(), events);
        return false;
    }
    return true;
}

void EpollReactor::execute()
{
    int nready = epoll_wait(epfd_, events_list_, MAX_EPOLL_EVENTS, 1000);
    if (nready < 0)
    {
        return;
    }
    for (int i = 0; i < nready; + + i)
    {
        EventSocket* socket_ptr = (struct EventSocket*)events_list_[i].data.ptr;
        if (socket_ptr->getStatus() == 1)
        {
            //acceptor
            if (handler_. accept_cb_() > 0)
            {
                printf("new clientfd \\
");
            }
            else
            {
                printf("accept new clientfd failed.\\
");
            }
        }
        else
        {
            //handler
            if (events_list_[i].events & EPOLLIN)
            {
                if (false == handler_.read_cb_(socket_ptr))
                {
                    handler_.close_cb_(socket_ptr);
                }
            }
            if (events_list_[i].events & EPOLLOUT)
            {
                if (false == handler_.write_cb_(socket_ptr))
                {
                    handler_.close_cb_(socket_ptr);
                }
            }
        }
    }
}


class EventSocketGroup
{<!-- -->
    public:
        EventSocketGroup():
            reactor_(NULL)
        {<!-- -->}

        ~EventSocketGroup()
        {<!-- -->}
        void setReactor(EpollReactor* reactor) {<!-- -->reactor_ = reactor;}

        EventSocket* getSocket(int socket_fd);

        bool onFDRead(EventSocket* socket_ptr);
        bool onFDWrite(EventSocket* socket_ptr);
        void closeSocket(EventSocket* socket_ptr);
    private:
        std::map<int, EventSocket> socket_map_;
        EpollReactor* reactor_;
};

EventSocket* EventSocketGroup::getSocket(int fd)
{
    auto iter = socket_map_.find(fd);
    if (iter == socket_map_.end())
    {
        socket_map_[fd] = EventSocket();
        socket_map_[fd].setFd(fd);
        return &(socket_map_[fd]);
    }
    return &(iter->second);
}

void EventSocketGroup::closeSocket(EventSocket* socket_ptr)
{
    reactor_->delEvent(socket_ptr);
    close(socket_ptr->getFd());
    socket_map_.erase(socket_ptr->getFd());
}


bool EventSocketGroup::onFDRead(EventSocket* socket_ptr)
{
    if (socket_ptr->readData())
    {
        // read successfully listen to write
        if (false == reactor_->modEvent(EPOLLOUT, socket_ptr))
        {
            return false;
        }
    }
    else
    {
        return false;
    }
    return true;
}

bool EventSocketGroup::onFDWrite(EventSocket* socket_ptr)
{
    if (socket_ptr->writeData())
    {
        // write success monitor read
        if (false == reactor_->modEvent(EPOLLIN, socket_ptr))
        {
            return false;
        }
    }
    else
    {
        return false;
    }
    return true;
}



/

class EpollAcceptor
{<!-- -->
    public:
        EpollAcceptor():
            listenfd_(0),
            reactor_(NULL),
            group_(NULL)
        {<!-- -->}
        ~EpollAcceptor() {<!-- -->}
        void setReactor(EpollReactor* reactor) {<!-- -->reactor_ = reactor;}
        void setSocketGroup(EventSocketGroup* group) {<!-- -->group_ = group;}
        bool startListen(int port);
        int onFDAccept();

    private:
        int listenfd_;
        EpollReactor* reactor_;
        EventSocketGroup* group_;
};

bool EpollAcceptor::startListen(int port)
{
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    fcntl(listenfd, F_SETFL, O_NONBLOCK);

    struct sockaddr_in server_addr;
    memset( & server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);

    // ::bind avoids conflict with std::bind
    if (-1 == ::bind(listenfd, (struct sockaddr*) & server_addr, sizeof(server_addr)))
    {
        printf("bind error %d \\
", port);
        return false;
    }

    if (listen(listenfd, 10) < 0)
    {
        printf("listen error %d \\
", port);
        return false;
    }

    printf("listen server port %d \\
", port);
    listenfd_ = listenfd;

    EventSocket* socket_ptr = group_->getSocket(listenfd_);
    if (NULL == socket_ptr)
    {
        printf("accept error \\
");
        return -1;
    }
    socket_ptr->setStatus(1);
    reactor_->addEvent(EPOLLIN, socket_ptr);

    return true;
}

int EpollAcceptor::onFDAccept()
{
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);

    int clientfd;
    if ((clientfd = accept(listenfd_, (struct sockaddr*) &client_addr, &len)) == -1)
    {
        printf("accept failed: %s \\
", strerror(errno));
        return -1;
    }
    // set to non-blocking
    int flag = 0;
    if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
    {
        printf("fcntl nonblock failed: %s \\
", strerror(errno));
        return -1;
    }

    EventSocket* socket_ptr = group_->getSocket(clientfd);
    if (NULL == socket_ptr)
    {
        printf("accept error \\
");
        return -1;
    }
    socket_ptr->setStatus(2);
    reactor_->addEvent(EPOLLIN, socket_ptr);

    printf("new connect: [%s:%d], pos[%d] \\
",
           inet_ntoa(client_addr. sin_addr),
           ntohs(client_addr. sin_port),
           clientfd);

    return clientfd;
}

///


class TcpServer
{<!-- -->
    public:
        TcpServer():
            port_(0)
        {<!-- -->}
        ~TcpServer()
        {<!-- -->}
    public:
        bool init(int port);
        bool start();
        void run();
        void stop();

    private:
        int port_;
        EpollReactor reactor_;
        EpollAcceptor acceptor_;
        EventSocketGroup group_;
};

bool TcpServer::init(int port)
{
    port_ = port;

    if (false == reactor_.init())
    {
        printf("socket group init failed.\\
");
        return false;
    }

    // Initialize socket_group
    group_.setReactor( & amp;reactor_);

    // Initialize the acceptor
    accepter_.setReactor( &reactor_);
    accepter_.setSocketGroup( & group_);

    // set reactor callback
    reactor_.getHandler().setAcceptCB(std::bind( &EpollAcceptor::onFDAccept, &accepter_));
    reactor_.getHandler().setReadCB(std::bind( &EventSocketGroup::onFDRead, &group_, std::placeholders::_1));
    reactor_.getHandler().setWriteCB(std::bind( &EventSocketGroup::onFDWrite, &group_, std::placeholders::_1));
    reactor_.getHandler().setCloseCB(std::bind( &EventSocketGroup::closeSocket, &group_, std::placeholders::_1));

    printf("init succ.\\
");
    return true;
}

bool TcpServer::start()
{
    if (false == accepter_. startListen(port_))
    {
        printf("socket group start failed.\\
");
        return false;
    }
    printf("start succ.\\
");
    return true;
}

void TcpServer::run()
{
    printf("Tcp server run.\\
");

    while (true)
    {
        reactor_.execute();
    }
}


void TcpServer::stop()
{
    printf("Tcp server stop.\\
");
}


int main()
{
    int port = 9999;

    TcpServer server;
    if (false == server.init(port))
    {
        printf("server init failed.\\
");
        return 1;
    }

    if (false == server. start())
    {
        printf("server start failed.\\
");
        return 1;
    }
    server. run();
    server. stop();
    return 0;
}

Recommend a free tutorial from Zero Sound Academy. I personally think the teacher taught it well, so I would like to share it with you: [Linux, Nginx, ZeroMQ, MySQL, Redis,
fastdfs, MongoDB, ZK, streaming media, CDN, P2P, K8S, Docker,
TCP/IP, coroutines, DPDK and other technical content, click to learn immediately:
Link