Article directory
-
- Background
- Reactor
-
- Single reactor single thread/process
- Single reactor multithreading
- Multiple reactors and multiple threads
- example
Generate background
Through the previous study, we know that the server can use IO multiplexing technology such as epoll to achieve high concurrency when processing a large number of connections. However, during use, it is found that it is not very convenient to use IO multiplexing technology through process-oriented programming. Therefore, a mode based on object-oriented thinking to encapsulate the network IO layer, the Reactor mode, is produced.
Reactor
Reactor is translated as a reactor, which is a bit difficult to understand. In fact, it only has a specific response to events. The Reactor mode also has a name Dispatcher, which is relatively more appropriate, the allocator. Use epoll to listen to events, and distribute according to the type after receiving the event.
Reactor is responsible for event monitoring and distribution. There are generally two types of events, one is to establish a connection, and the other is to read and write data.
- To establish a connection, we generally hand it over to an Acceptor to handle the reward connection and then exchange the Reactor to listen to the new socekt
- Data reading and writing are generally handed over to a Handler, and the general process is read->processing business->write
There are many ways to organize the Reactor model, and the number of reactors and handlers can be divided into
- Single reactor single thread/process
- Single reactor multithread/process
- Multi-reactor multi-thread/process
Theoretically, there are multiple reactors and single processes, but compared with single reactors and single processes, the processing efficiency has not improved except for the complexity
Single reactor single thread/process
The implementation is simple, and there is no need to consider the communication competition between threads, which is suitable for scenarios with fast business processing.
shortcoming:
- Event monitoring, connection establishment, and business processing are all in one thread. If the business processing takes a long time, other functions will be blocked.
- Unable to take advantage of multi-core cpu performance.
Single reactor multi-thread
The process of listening to events and establishing a connection is the same as that of a single thread, but there are differences in the processing of the handler. The handler is only responsible for reading and writing data, while business processing is handled by other worker threads. One implementation method is that the handler sends the data to the message queue of the worker thread after reading the data. The worker thread will continuously read the data in the message queue and then process it. After the processing is completed, the result is returned to the handler, and then sent through send.
- Because of the use of multi-threading, the resources of multi-core cpu can be fully utilized. At the same time, inter-thread communication needs to be locked on shared resources, such as the cache where the handler saves the calculation results.
- In single-reactor mode, a reactor listens to all events, and there may be a performance bottleneck.
- As for the implementation of single reactor and multiple processes, it is a lot of trouble, mainly because the implementation of inter-process communication is much more difficult.
Multi-reactor multi-thread
In multi-reactor mode, master-slave reactors are generally set. The master reactor only pays attention to listenfd, listens to the event of establishing a connection, and then hands it over to accpeter for processing. After the connection is established, the sub-reactor is responsible for the monitoring of read and write events. The sub-reactor and the corresponding handler are in the same thread. When new read and write events occur, they are handed over to the corresponding handler to complete the corresponding business.
Example
EventSocket manages a connected data corresponding to a client, and implements basic read and write functions.
class EventSocket
{<!-- -->
public:
EventSocket():
fd_(0),
status_(0),
length_(0)
{<!-- -->}
~EventSocket()
{<!-- -->}
int getFd() {<!-- -->return fd_;}
void setFd(int fd) {<!-- -->fd_ = fd;}
int getStatus() {<!-- -->return status_;}
void setStatus(int status) {<!-- -->status_ = status;}
bool readData();
bool writeData();
private:
int fd_;
int status_; // 0 none 1 listening 2 common
int length_;
int buffer_[BUFF_LENGTH];
};
EventSocketGroup is used to manage all connections and is responsible for connection creation and destruction. Here, for the convenience of directly using map management, a better approach can be to use pooled counting.
class EventSocketGroup
{<!-- -->
public:
EventSocketGroup():
reactor_(NULL)
{<!-- -->}
~EventSocketGroup()
{<!-- -->}
void setReactor(EpollReactor* reactor) {<!-- -->reactor_ = reactor;}
EventSocket* getSocket(int socket_fd);
bool onFDRead(EventSocket* socket_ptr);
bool onFDWrite(EventSocket* socket_ptr);
void closeSocket(EventSocket* socket_ptr);
private:
std::map<int, EventSocket> socket_map_;
EpollReactor* reactor_;
};
EpollHandler specifies the callback interface for epoll events.
EpollReactor manages epoll objects, and realizes adding, deleting and modifying functions that encapsulate epoll events. The execute function implements the main logic of epoll_wait, and calls back the function registered in EpollHandler when an event occurs.
class EpollHandler
{<!-- -->
public:
typedef std::function<int()> AcceptCallback;
typedef std::function<bool(EventSocket* socket_ptr)> ReadCallBack;
typedef std::function<bool(EventSocket* socket_ptr)> WriteCallBack;
typedef std::function<void(EventSocket* socket_ptr)> CloseCallBack;
void setAcceptCB(const AcceptCallback & cb) {<!-- --> accept_cb_ = cb;}
void setReadCB(const ReadCallBack & amp; cb) {<!-- -->read_cb_ = cb;}
void setWriteCB(const WriteCallBack & amp; cb) {<!-- -->write_cb_ = cb;}
void setCloseCB(const CloseCallBack & amp; cb) {<!-- -->close_cb_ = cb;}
public:
AcceptCallback accept_cb_;
ReadCallBack read_cb_;
WriteCallBack write_cb_;
CloseCallBack close_cb_;
};
class Epoll Reactor
{<!-- -->
public:
EpollReactor(): epfd_(0)
{<!-- -->}
~EpollReactor()
{<!-- -->}
bool init();
bool addEvent(int events, EventSocket* socket_ptr);
bool modEvent(int events, EventSocket* socket_ptr);
void delEvent(EventSocket* socket_ptr);
void execute();
EpollHandler & amp; getHandler() {<!-- --> return handler_;}
private:
int epfd_;
struct epoll_event events_list_[MAX_EPOLL_EVENTS + 1];
EpollHandler handler_;
};
EpollAcceptor is responsible for maintaining the listening socket and registering with Reactor for monitoring when a new connection is added.
class EpollAcceptor
{<!-- -->
public:
EpollAcceptor():
listenfd_(0),
reactor_(NULL),
group_(NULL)
{<!-- -->}
~EpollAcceptor() {<!-- -->}
void setReactor(EpollReactor* reactor) {<!-- -->reactor_ = reactor;}
void setSocketGroup(EventSocketGroup* group) {<!-- -->group_ = group;}
bool startListen(int port);
int onFDAccept();
private:
int listenfd_;
EpollReactor* reactor_;
EventSocketGroup* group_;
};
TcpServer is responsible for the initialization of all modules and process management.
init initializes each module and registers callback functions for various events.
start starts listening on the port for connections.
run repeatedly calls epoll_wait in a loop to listen to and process events
stop is responsible for resource recycling. In order to demonstrate how the reactor is built, the example does not set the server stop method, which will be omitted later.
class TcpServer
{<!-- -->
public:
TcpServer():
port_(0)
{<!-- -->}
~TcpServer()
{<!-- -->}
public:
bool init(int port);
bool start();
void run();
void stop();
private:
int port_;
EpollReactor reactor_;
EpollAcceptor acceptor_;
EventSocketGroup group_;
};
complete example
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
Recommend a free tutorial from Zero Sound Academy. I personally think the teacher taught it well, so I would like to share it with you: [Linux, Nginx, ZeroMQ, MySQL, Redis,
fastdfs, MongoDB, ZK, streaming media, CDN, P2P, K8S, Docker,
TCP/IP, coroutines, DPDK and other technical content, click to learn immediately:
Link