The complete project is at:
Article directory
- 1. Connection module: This is a module for overall management of communication connections. All operations on a connection are performed through this module!
- 2. Functions provided
- 3. Realize ideas
-
- (1) Function
- (2) Meaning
- (3) Functional design
- 4. Framework
- 5. Code
1. Connection module: This is a module for overall management of communication connections. All operations on a connection are performed through this module!
2. Functions provided
Connection module, how to handle any events in a connection is handled by this module. Because the design of the component does not know how the user handles the event, it can only provide some event callback functions to be set by the user.
3. Implement ideas
(1) Function
- Sending data – the interface for generating data provided to the user is not really a sending interface, but only sends the data to the sending buffer, and then starts the write event monitoring!
- Close the connection – provide the user with a close link interface. Before actually releasing the connection, check whether there is data to be processed in the input and output buffers!
- Start the inactive connection timeout destruction function
- Cancel the inactive connection timeout destruction function
- Protocol switching – how a link performs business processing after accepting data depends on the context and the business processing callback function of the data!
(2) Meaning
This module itself is not a separate functional module, but a module that manages connections.
(3) Functional design
- Socket management, able to perform socket operations!
- Management of connection events, readable, writable, error, hang up, any!
- Buffer management: Put the data read from the socket into the buffer, and there must be input buffer and output buffer management!
- Management of protocol context, recording the processing process of request data!
- Management of callback functions
Because it is up to the user to decide how to process the data after the connection receives it, and there must be a business processing function!
After a connection is successfully established, it is up to the user to decide how to handle it, so there must be a callback function for a successful connection!
What to do before a connection is closed is decided by the user, so there must be a callback function to close the connection!
Whether any event is generated or not is determined by the user, so there must be a callback function for any event!
Scenario: When operating a connection, the connection is released, resulting in a memory access error and eventually the program crashes!
Solution: Use the smart pointer share_ptr to manage the Connect object. This can ensure that when the Connect object is operated from any place,
A share_ptr is saved, so even if it is released elsewhere, it will only reduce the share_ptr counter by -1, and will not cause the actual release of the Connection!
4. Framework
DISCONECTED -- connection closed state; CONNECTING -- connection established successfully - pending state //CONNECTED -- The connection is established, various settings have been completed, and communication is possible; DISCONNECTING -- To be closed state type enum {<!-- --> //The connection is closed; // Connection established successfully - pending status; // The connection is established and communication is possible; //To be closed state; DISCONECTED,CONNECTING,CONNECTED,DISCONECTING} ConnStatu; using PreConnection = std::shared_ptr<Connection>; class Connection {<!-- --> private: uint64_t _conn_id; //The unique ID of the connection, which facilitates the management and search of the connection. bool _enable_inactive_release; // Determining whether the connection starts inactive destruction, the default is false int _sockfd; // File descriptor associated with the connection ConnStatu _statu; // Socket _socket; // Socket operation management Channel _channel; // Connect two points of event management Buffer _in_buffer; // Input buffer - stores data read from the socket buffer _out_buffer; // Output buffer - data is sent to the peer. Wait until the descriptor event is writable before sending! Any _context; // Request acceptance processing context /*These four callback functions are set by the server module (in fact, the processing callbacks of the server module are also set by the component user)*/ /*In other words, these callbacks are used by component users*/ using ConnectCallback = std::function<void(const PreConnection & amp;)>; using MessageCallback = std::function<void(const PtrConnection & amp;, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection & amp;)>; using AnyEventCallback = std::function<void(const PtrConnection & amp;)>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; /*The connection closing callback within the component - set within the component, because the server component will manage all connections, once a connection is to be closed*/ /*You should remove your own information from the management place*/ ClosedCallback _server_closed_callback; private: // /*Event callback functions for five channels*/ //The function called after the descriptor readable event is triggered, receives the socket data and puts it into the receiving buffer, and then calls _message_callback void HandleRead() {<!-- --> } void HandleRead() {<!-- --> } void HandleClose() {<!-- --> } void HandleError() {<!-- --> } //The descriptor triggers any event: 1. Refresh the connection activity-delay the scheduled destruction task; 2. Call any event callback of the component user void HandleEvent() {<!-- --> } //After the connection is obtained, various settings must be made in the current state (start read monitoring, call callback function) void EstablishedInLoop() {<!-- --> } //This interface is the actual release interface void ReleaseInLoop() {<!-- --> } //This interface is not the actual sending interface, but just puts the data into the sending buffer and starts writable event monitoring. void SendInLoop(Buffer & amp;buf) {<!-- --> } //This closing operation is not an actual connection release operation. It needs to be judged whether there is any data to be processed and sent. void ShutdownInLoop() {<!-- --> } //Start the inactive connection timeout release rule void EnableInactiveReleaseInLoop(int sec) {<!-- --> } void CancelInactiveReleaseInLoop() {<!-- --> } void UpgradeInLoop(const Any & amp;context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback & amp;event) {<!-- --> _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; } public: Connection(EventLoop* loop,uint64_t _conn_id,int sockfd) : _sockfd(sockfd), _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) {<!-- --> _channel.SetCloseCallback(std::bind( & amp;Connection::HandleClose, this)); _channel.SetEventCallback(std::bind( & amp;Connection::HandleEvent, this)); _channel.SetReadCallback(std::bind( & amp;Connection::HandleRead, this)); _channel.SetWriteCallback(std::bind( & amp;Connection::HandleWrite, this)); _channel.SetErrorCallback(std::bind( & amp;Connection::HandleError, this)); } ~Connection() {<!-- --> DBG_LOG("RELEASE CONNECTION:%p", this); } //Get the managed file descriptor int Fd() {<!-- -->return _sockfd; } // Get the connection ID int Id() {<!-- -->return _conn_id; } // Whether it is in CONNECTED state bool Connected() {<!-- --> return (_statu == CONNECTED); } //Set context--called when the connection is established void SetContext(const Any & amp;context) {<!-- --> _context = context; } //Get the context and return a pointer Any *GetContext() {<!-- --> return & amp;_context; } void SetConnectedCallback(const ConnectedCallback & amp;cb) {<!-- --> _connected_callback = cb; } void SetMessageCallback(const MessageCallback & amp;cb) {<!-- --> _message_callback = cb; } void SetClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback & amp;cb) {<!-- --> _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _server_closed_callback = cb; } //After the connection is established, set the channel callback, start read monitoring, and call _connected_callback void Established() {<!-- -->} _loop->RunInLoop(std::bind( & amp;Connection::EstablishedInLoop, this)); //Send data, put the data in the send buffer, and start write event monitoring void Send(const char *data, size_t len) {<!-- -->} //The shutdown interface provided to component users--it does not actually close it. It needs to be judged whether there is data to be processed. void Shutdown() {<!-- -->} void Release() {<!-- -->} //Start inactive destruction, and define how long it will take for no communication to become inactive, and add a scheduled task void EnableInactiveRelease(int sec) {<!-- --> } //Cancel inactive destruction void CancelInactiveRelease() {<!-- -->} void Upgrade(const Any & amp;context, const ConnectedCallback & amp;conn, const MessageCallback & amp;msg, const ClosedCallback & amp;closed, const AnyEventCallback & amp;event) {<!-- --> _loop->AssertInLoop(); _loop->RunInLoop(std::bind( & amp;Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); } };
5. Code
class Connection; //DISCONECTED -- connection closed state; CONNECTING -- connection established successfully - pending state //CONNECTED -- The connection is established, various settings have been completed, and communication is possible; DISCONNECTING -- To be closed state typedef enum {<!-- --> DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING}ConnStatu; using PtrConnection = std::shared_ptr<Connection>; class Connection : public std::enable_shared_from_this<Connection> {<!-- --> private: uint64_t _conn_id; // The unique ID of the connection, which facilitates the management and search of the connection. //uint64_t _timer_id; //Timer ID must be unique. In order to simplify the operation, conn_id is used as the timer ID. int _sockfd; // File descriptor associated with the connection bool _enable_inactive_release; // Determining whether the connection starts inactive destruction, the default is false EventLoop *_loop; // An EventLoop associated with the connection ConnStatu _statu; // connection status Socket _socket; // Socket operation management Channel _channel; //Connected event management Buffer _in_buffer; //Input buffer---stores data read from the socket Buffer _out_buffer; // Output buffer---stores data to be sent to the peer Any _context; // Requested reception processing context /*These four callback functions are set by the server module (in fact, the processing callbacks of the server module are also set by the component user)*/ /*In other words, these callbacks are used by component users*/ using ConnectedCallback = std::function<void(const PtrConnection & amp;)>; using MessageCallback = std::function<void(const PtrConnection & amp;, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection & amp;)>; using AnyEventCallback = std::function<void(const PtrConnection & amp;)>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; /*The connection closing callback within the component - set within the component, because the server component will manage all connections, once a connection is to be closed*/ /*You should remove your own information from the management place*/ ClosedCallback _server_closed_callback; private: /*Event callback functions for five channels*/ //The function called after the descriptor readable event is triggered, receives the socket data and puts it into the receiving buffer, and then calls _message_callback void HandleRead() {<!-- --> //1. Receive socket data and put it in the buffer char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, 65535); if (ret < 0) {<!-- --> //An error occurred, the connection cannot be closed directly return ShutdownInLoop(); } //The equal to 0 here means that no data has been read, not that the connection is disconnected. When the connection is disconnected, -1 is returned. //Put the data into the input buffer and move the write offset backward after writing. _in_buffer.WriteAndPush(buf, ret); //2. Call message_callback for business processing if (_in_buffer.ReadAbleSize() > 0) {<!-- --> //shared_from_this--Get its own shared_ptr management object from the current object itself return _message_callback(shared_from_this(), & amp;_in_buffer); } } //The function called after the descriptor writable event is triggered will send the data in the send buffer void HandleWrite() {<!-- --> //The data saved in _out_buffer is the data to be sent ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize()); if (ret < 0) {<!-- --> //It’s time to close the connection after sending an error. if (_in_buffer.ReadAbleSize() > 0) {<!-- --> _message_callback(shared_from_this(), & amp;_in_buffer); } return Release();//This is the actual closing and releasing operation. } _out_buffer.MoveReadOffset(ret);//Don’t forget to move the read offset backwards if (_out_buffer.ReadAbleSize() == 0) {<!-- --> _channel.DisableWrite();//There is no data to be sent, turn off write event monitoring //If the current connection is to be closed, there is data, and the connection is released after sending the data. If there is no data, it is released directly. if (_statu == DISCONNECTING) {<!-- --> return Release(); } } return; } //Descriptor triggers hang-up event void HandleClose() {<!-- --> /*Once the connection is hung up, the socket can't do anything, so if there is data to be processed, process it and close the connection when finished*/ if (_in_buffer.ReadAbleSize() > 0) {<!-- --> _message_callback(shared_from_this(), & amp;_in_buffer); } return Release(); } //Descriptor triggers error event void HandleError() {<!-- --> return HandleClose(); } //The descriptor triggers any event: 1. Refresh the connection activity-delay the scheduled destruction task; 2. Call any event callback of the component user void HandleEvent() {<!-- --> if (_enable_inactive_release == true) {<!-- --> _loop->TimerRefresh(_conn_id); } if (_event_callback) {<!-- --> _event_callback(shared_from_this()); } } //After the connection is obtained, various settings must be made in the current state (start read monitoring, call callback function) void EstablishedInLoop() {<!-- --> // 1. Modify the connection status; 2. Start read event monitoring; 3. Call the callback function assert(_statu == CONNECTING);//The current state must be the upper semi-connected state _statu = CONNECTED; //After the current function is executed, the connection enters the completed connection state // Once the read event monitoring is started, the read event may be triggered immediately. If the inactive connection destruction is started at this time, _channel.EnableRead(); if (_connected_callback) _connected_callback(shared_from_this()); } //This interface is the actual release interface void ReleaseInLoop() {<!-- --> //1. Modify the connection status and set it to DISCONNECTED _statu = DISCONNECTED; //2. Remove the connected event monitoring _channel.Remove(); //3. Close the descriptor _socket.Close(); //4. If there are still scheduled destruction tasks in the current timer queue, cancel the task if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop(); //5. Call the close callback function to avoid removing the connection information managed by the server first, causing the Connection to be released, and then processing it will cause an error. Therefore, call the user's callback function first. if (_closed_callback) _closed_callback(shared_from_this()); //Remove connection information managed internally by the server if (_server_closed_callback) _server_closed_callback(shared_from_this()); } //This interface is not the actual sending interface, but just puts the data into the sending buffer and starts writable event monitoring. void SendInLoop(Buffer & amp;buf) {<!-- --> if (_statu == DISCONNECTED) return ; _out_buffer.WriteBufferAndPush(buf); if (_channel.WriteAble() == false) {<!-- --> _channel.EnableWrite(); } } //This closing operation is not an actual connection release operation. It needs to be judged whether there is any data to be processed and sent. void ShutdownInLoop() {<!-- --> _statu = DISCONNECTING;//Set the connection to a semi-closed state if (_in_buffer.ReadAbleSize() > 0) {<!-- --> if (_message_callback) _message_callback(shared_from_this(), & amp;_in_buffer); } //Either there is an error when writing data and it is closed, or there is no data to be sent and it is closed directly. if (_out_buffer.ReadAbleSize() > 0) {<!-- --> if (_channel.WriteAble() == false) {<!-- --> _channel.EnableWrite(); } } if (_out_buffer.ReadAbleSize() == 0) {<!-- --> Release(); } } //Start the inactive connection timeout release rule void EnableInactiveReleaseInLoop(int sec) {<!-- --> //1. Set the judgment flag _enable_inactive_release to true _enable_inactive_release = true; //2. If the current scheduled destruction task already exists, just refresh and delay it. if (_loop->HasTimer(_conn_id)) {<!-- --> return _loop->TimerRefresh(_conn_id); } //3. If there is no scheduled destruction task, add it _loop->TimerAdd(_conn_id, sec, std::bind( & amp;Connection::Release, this)); } void CancelInactiveReleaseInLoop() {<!-- --> _enable_inactive_release = false; if (_loop->HasTimer(_conn_id)) {<!-- --> _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any & amp;context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback & amp;event) {<!-- --> _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; } public: Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) {<!-- --> _channel.SetCloseCallback(std::bind( & amp;Connection::HandleClose, this)); _channel.SetEventCallback(std::bind( & amp;Connection::HandleEvent, this)); _channel.SetReadCallback(std::bind( & amp;Connection::HandleRead, this)); _channel.SetWriteCallback(std::bind( & amp;Connection::HandleWrite, this)); _channel.SetErrorCallback(std::bind( & amp;Connection::HandleError, this)); } ~Connection() {<!-- --> DBG_LOG("RELEASE CONNECTION:%p", this); } //Get the managed file descriptor int Fd() {<!-- --> return _sockfd; } //Get the connection ID int Id() {<!-- --> return _conn_id; } //Whether it is in CONNECTED state bool Connected() {<!-- --> return (_statu == CONNECTED); } //Set context--called when the connection is established void SetContext(const Any & amp;context) {<!-- --> _context = context; } //Get the context and return a pointer Any *GetContext() {<!-- --> return & amp;_context; } void SetConnectedCallback(const ConnectedCallback & amp;cb) {<!-- --> _connected_callback = cb; } void SetMessageCallback(const MessageCallback & amp;cb) {<!-- --> _message_callback = cb; } void SetClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback & amp;cb) {<!-- --> _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _server_closed_callback = cb; } //After the connection is established, set the channel callback, start read monitoring, and call _connected_callback void Established() {<!-- --> _loop->RunInLoop(std::bind( & amp;Connection::EstablishedInLoop, this)); } //Send data, put the data in the send buffer, and start write event monitoring void Send(const char *data, size_t len) {<!-- --> //The data passed in from the outside may be a temporary space. We are just pushing the sending operation into the task pool, and it may not be executed immediately. //So it is possible that the space pointed to by data may have been released during execution. Buffer buf; buf.WriteAndPush(data, len); _loop->RunInLoop(std::bind( & amp;Connection::SendInLoop, this, std::move(buf))); } //The shutdown interface provided to component users--it does not actually close it. It needs to be judged whether there is data to be processed. void Shutdown() {<!-- --> _loop->RunInLoop(std::bind( & amp;Connection::ShutdownInLoop, this)); } void Release() {<!-- --> _loop->QueueInLoop(std::bind( & amp;Connection::ReleaseInLoop, this)); } //Start inactive destruction, and define how long it will take for no communication to become inactive, and add a scheduled task void EnableInactiveRelease(int sec) {<!-- --> _loop->RunInLoop(std::bind( & amp;Connection::EnableInactiveReleaseInLoop, this, sec)); } //Cancel inactive destruction void CancelInactiveRelease() {<!-- --> _loop->RunInLoop(std::bind( & amp;Connection::CancelInactiveReleaseInLoop, this)); } //Switch protocol --- reset context and phased callback processing function -- but this interface must be executed immediately in the EventLoop thread //Prevent that after a new event is triggered, the switching task has not been executed during processing - which will cause the data to be processed using the original protocol. void Upgrade(const Any & amp;context, const ConnectedCallback & amp;conn, const MessageCallback & amp;msg, const ClosedCallback & amp;closed, const AnyEventCallback & amp;event) {<!-- --> _loop->AssertInLoop(); _loop->RunInLoop(std::bind( & amp;Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); } };