1.9.C++ project: Design of Connection module imitating muduo library to implement concurrent server

The complete project is at:

Article directory

  • 1. Connection module: This is a module for overall management of communication connections. All operations on a connection are performed through this module!
  • 2. Functions provided
  • 3. Realize ideas
    • (1) Function
    • (2) Meaning
    • (3) Functional design
  • 4. Framework
  • 5. Code

1. Connection module: This is a module for overall management of communication connections. All operations on a connection are performed through this module!

2. Functions provided

Connection module, how to handle any events in a connection is handled by this module. Because the design of the component does not know how the user handles the event, it can only provide some event callback functions to be set by the user.

3. Implement ideas

(1) Function

  1. Sending data – the interface for generating data provided to the user is not really a sending interface, but only sends the data to the sending buffer, and then starts the write event monitoring!
  2. Close the connection – provide the user with a close link interface. Before actually releasing the connection, check whether there is data to be processed in the input and output buffers!
  3. Start the inactive connection timeout destruction function
  4. Cancel the inactive connection timeout destruction function
  5. Protocol switching – how a link performs business processing after accepting data depends on the context and the business processing callback function of the data!

(2) Meaning

This module itself is not a separate functional module, but a module that manages connections.

(3) Functional design

  1. Socket management, able to perform socket operations!
  2. Management of connection events, readable, writable, error, hang up, any!
  3. Buffer management: Put the data read from the socket into the buffer, and there must be input buffer and output buffer management!
  4. Management of protocol context, recording the processing process of request data!
  5. Management of callback functions
    Because it is up to the user to decide how to process the data after the connection receives it, and there must be a business processing function!
    After a connection is successfully established, it is up to the user to decide how to handle it, so there must be a callback function for a successful connection!
    What to do before a connection is closed is decided by the user, so there must be a callback function to close the connection!
    Whether any event is generated or not is determined by the user, so there must be a callback function for any event!

Scenario: When operating a connection, the connection is released, resulting in a memory access error and eventually the program crashes!
Solution: Use the smart pointer share_ptr to manage the Connect object. This can ensure that when the Connect object is operated from any place,
A share_ptr is saved, so even if it is released elsewhere, it will only reduce the share_ptr counter by -1, and will not cause the actual release of the Connection!

4. Framework

DISCONECTED -- connection closed state; CONNECTING -- connection established successfully - pending state
//CONNECTED -- The connection is established, various settings have been completed, and communication is possible; DISCONNECTING -- To be closed state
type enum {<!-- -->
        //The connection is closed;
        // Connection established successfully - pending status;
        // The connection is established and communication is possible;
        //To be closed state;
        DISCONECTED,CONNECTING,CONNECTED,DISCONECTING} ConnStatu;
using PreConnection = std::shared_ptr<Connection>;
class Connection {<!-- -->
        private:
                uint64_t _conn_id; //The unique ID of the connection, which facilitates the management and search of the connection.
                bool _enable_inactive_release; // Determining whether the connection starts inactive destruction, the default is false
                int _sockfd; // File descriptor associated with the connection
                ConnStatu _statu; //
                Socket _socket; // Socket operation management
                Channel _channel; // Connect two points of event management
                Buffer _in_buffer; // Input buffer - stores data read from the socket
                buffer _out_buffer; // Output buffer - data is sent to the peer. Wait until the descriptor event is writable before sending!
                Any _context; // Request acceptance processing context
                /*These four callback functions are set by the server module (in fact, the processing callbacks of the server module are also set by the component user)*/
                /*In other words, these callbacks are used by component users*/
                using ConnectCallback = std::function<void(const PreConnection & amp;)>;
                using MessageCallback = std::function<void(const PtrConnection & amp;, Buffer *)>;
                using ClosedCallback = std::function<void(const PtrConnection & amp;)>;
                using AnyEventCallback = std::function<void(const PtrConnection & amp;)>;
                ConnectedCallback _connected_callback;
                MessageCallback _message_callback;
                ClosedCallback _closed_callback;
                AnyEventCallback _event_callback;
                /*The connection closing callback within the component - set within the component, because the server component will manage all connections, once a connection is to be closed*/
                /*You should remove your own information from the management place*/
                ClosedCallback _server_closed_callback;
        private:
        // /*Event callback functions for five channels*/
        //The function called after the descriptor readable event is triggered, receives the socket data and puts it into the receiving buffer, and then calls _message_callback
                void HandleRead() {<!-- -->
                }
                void HandleRead() {<!-- -->
                }
                void HandleClose() {<!-- -->
                }
                void HandleError() {<!-- -->
                }
                //The descriptor triggers any event: 1. Refresh the connection activity-delay the scheduled destruction task; 2. Call any event callback of the component user
                void HandleEvent() {<!-- -->
                }
                //After the connection is obtained, various settings must be made in the current state (start read monitoring, call callback function)
                void EstablishedInLoop() {<!-- -->
                }
                //This interface is the actual release interface
                void ReleaseInLoop() {<!-- -->
                }
                //This interface is not the actual sending interface, but just puts the data into the sending buffer and starts writable event monitoring.
                void SendInLoop(Buffer & amp;buf) {<!-- -->
                }
                //This closing operation is not an actual connection release operation. It needs to be judged whether there is any data to be processed and sent.
                void ShutdownInLoop() {<!-- -->
                }
                //Start the inactive connection timeout release rule
                void EnableInactiveReleaseInLoop(int sec) {<!-- -->
                }
                void CancelInactiveReleaseInLoop() {<!-- -->
                }
                void UpgradeInLoop(const Any & amp;context,
                        const ConnectedCallback &conn,
                        const MessageCallback &msg,
                        const ClosedCallback &closed,
                        const AnyEventCallback & amp;event) {<!-- -->
                        _context = context;
                        _connected_callback = conn;
                        _message_callback = msg;
                        _closed_callback = closed;
                        _event_callback = event;
                }
        public:
                Connection(EventLoop* loop,uint64_t _conn_id,int sockfd) : _sockfd(sockfd),
                        _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) {<!-- -->
                        _channel.SetCloseCallback(std::bind( & amp;Connection::HandleClose, this));
                        _channel.SetEventCallback(std::bind( & amp;Connection::HandleEvent, this));
                        _channel.SetReadCallback(std::bind( & amp;Connection::HandleRead, this));
                        _channel.SetWriteCallback(std::bind( & amp;Connection::HandleWrite, this));
                        _channel.SetErrorCallback(std::bind( & amp;Connection::HandleError, this));
                }
                 ~Connection() {<!-- --> DBG_LOG("RELEASE CONNECTION:%p", this); }
                  //Get the managed file descriptor
                 int Fd() {<!-- -->return _sockfd; }
                 // Get the connection ID
                 int Id() {<!-- -->return _conn_id; }
                 // Whether it is in CONNECTED state
                 bool Connected() {<!-- --> return (_statu == CONNECTED); }
                 //Set context--called when the connection is established
                void SetContext(const Any & amp;context) {<!-- --> _context = context; }
                 //Get the context and return a pointer
                Any *GetContext() {<!-- --> return & amp;_context; }
                void SetConnectedCallback(const ConnectedCallback & amp;cb) {<!-- --> _connected_callback = cb; }
                void SetMessageCallback(const MessageCallback & amp;cb) {<!-- --> _message_callback = cb; }
                void SetClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _closed_callback = cb; }
                void SetAnyEventCallback(const AnyEventCallback & amp;cb) {<!-- --> _event_callback = cb; }
                void SetSrvClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _server_closed_callback = cb; }
                //After the connection is established, set the channel callback, start read monitoring, and call _connected_callback
                void Established() {<!-- -->}
                _loop->RunInLoop(std::bind( & amp;Connection::EstablishedInLoop, this));
                //Send data, put the data in the send buffer, and start write event monitoring
                void Send(const char *data, size_t len) {<!-- -->}
                    //The shutdown interface provided to component users--it does not actually close it. It needs to be judged whether there is data to be processed.
                void Shutdown() {<!-- -->}
                void Release() {<!-- -->}
                //Start inactive destruction, and define how long it will take for no communication to become inactive, and add a scheduled task
                void EnableInactiveRelease(int sec) {<!-- --> }
                 //Cancel inactive destruction
                void CancelInactiveRelease() {<!-- -->}
                void Upgrade(const Any & amp;context, const ConnectedCallback & amp;conn, const MessageCallback & amp;msg,
                        const ClosedCallback & amp;closed, const AnyEventCallback & amp;event) {<!-- -->
                        _loop->AssertInLoop();
                        _loop->RunInLoop(std::bind( & amp;Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
        }

 };

5. Code

class Connection;
//DISCONECTED -- connection closed state; CONNECTING -- connection established successfully - pending state
//CONNECTED -- The connection is established, various settings have been completed, and communication is possible; DISCONNECTING -- To be closed state
typedef enum {<!-- --> DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING}ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection> {<!-- -->
    private:
        uint64_t _conn_id; // The unique ID of the connection, which facilitates the management and search of the connection.
        //uint64_t _timer_id; //Timer ID must be unique. In order to simplify the operation, conn_id is used as the timer ID.
        int _sockfd; // File descriptor associated with the connection
        bool _enable_inactive_release; // Determining whether the connection starts inactive destruction, the default is false
        EventLoop *_loop; // An EventLoop associated with the connection
        ConnStatu _statu; // connection status
        Socket _socket; // Socket operation management
        Channel _channel; //Connected event management
        Buffer _in_buffer; //Input buffer---stores data read from the socket
        Buffer _out_buffer; // Output buffer---stores data to be sent to the peer
        Any _context; // Requested reception processing context

        /*These four callback functions are set by the server module (in fact, the processing callbacks of the server module are also set by the component user)*/
        /*In other words, these callbacks are used by component users*/
        using ConnectedCallback = std::function<void(const PtrConnection & amp;)>;
        using MessageCallback = std::function<void(const PtrConnection & amp;, Buffer *)>;
        using ClosedCallback = std::function<void(const PtrConnection & amp;)>;
        using AnyEventCallback = std::function<void(const PtrConnection & amp;)>;
        ConnectedCallback _connected_callback;
        MessageCallback _message_callback;
        ClosedCallback _closed_callback;
        AnyEventCallback _event_callback;
        /*The connection closing callback within the component - set within the component, because the server component will manage all connections, once a connection is to be closed*/
        /*You should remove your own information from the management place*/
        ClosedCallback _server_closed_callback;
    private:
        /*Event callback functions for five channels*/
        //The function called after the descriptor readable event is triggered, receives the socket data and puts it into the receiving buffer, and then calls _message_callback
        void HandleRead() {<!-- -->
            //1. Receive socket data and put it in the buffer
            char buf[65536];
            ssize_t ret = _socket.NonBlockRecv(buf, 65535);
            if (ret < 0) {<!-- -->
                //An error occurred, the connection cannot be closed directly
                return ShutdownInLoop();
            }
            //The equal to 0 here means that no data has been read, not that the connection is disconnected. When the connection is disconnected, -1 is returned.
            //Put the data into the input buffer and move the write offset backward after writing.
            _in_buffer.WriteAndPush(buf, ret);
            //2. Call message_callback for business processing
            if (_in_buffer.ReadAbleSize() > 0) {<!-- -->
                //shared_from_this--Get its own shared_ptr management object from the current object itself
                return _message_callback(shared_from_this(), & amp;_in_buffer);
            }
        }
        //The function called after the descriptor writable event is triggered will send the data in the send buffer
        void HandleWrite() {<!-- -->
            //The data saved in _out_buffer is the data to be sent
            ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
            if (ret < 0) {<!-- -->
                //It’s time to close the connection after sending an error.
                if (_in_buffer.ReadAbleSize() > 0) {<!-- -->
                    _message_callback(shared_from_this(), & amp;_in_buffer);
                }
                return Release();//This is the actual closing and releasing operation.
            }
            _out_buffer.MoveReadOffset(ret);//Don’t forget to move the read offset backwards
            if (_out_buffer.ReadAbleSize() == 0) {<!-- -->
                _channel.DisableWrite();//There is no data to be sent, turn off write event monitoring
                //If the current connection is to be closed, there is data, and the connection is released after sending the data. If there is no data, it is released directly.
                if (_statu == DISCONNECTING) {<!-- -->
                    return Release();
                }
            }
            return;
        }
        //Descriptor triggers hang-up event
        void HandleClose() {<!-- -->
            /*Once the connection is hung up, the socket can't do anything, so if there is data to be processed, process it and close the connection when finished*/
            if (_in_buffer.ReadAbleSize() > 0) {<!-- -->
                _message_callback(shared_from_this(), & amp;_in_buffer);
            }
            return Release();
        }
        //Descriptor triggers error event
        void HandleError() {<!-- -->
            return HandleClose();
        }
        //The descriptor triggers any event: 1. Refresh the connection activity-delay the scheduled destruction task; 2. Call any event callback of the component user
        void HandleEvent() {<!-- -->
            if (_enable_inactive_release == true) {<!-- --> _loop->TimerRefresh(_conn_id); }
            if (_event_callback) {<!-- --> _event_callback(shared_from_this()); }
        }
        //After the connection is obtained, various settings must be made in the current state (start read monitoring, call callback function)
        void EstablishedInLoop() {<!-- -->
            // 1. Modify the connection status; 2. Start read event monitoring; 3. Call the callback function
            assert(_statu == CONNECTING);//The current state must be the upper semi-connected state
            _statu = CONNECTED; //After the current function is executed, the connection enters the completed connection state
            // Once the read event monitoring is started, the read event may be triggered immediately. If the inactive connection destruction is started at this time,
            _channel.EnableRead();
            if (_connected_callback) _connected_callback(shared_from_this());
        }
        //This interface is the actual release interface
        void ReleaseInLoop() {<!-- -->
            //1. Modify the connection status and set it to DISCONNECTED
            _statu = DISCONNECTED;
            //2. Remove the connected event monitoring
            _channel.Remove();
            //3. Close the descriptor
            _socket.Close();
            //4. If there are still scheduled destruction tasks in the current timer queue, cancel the task
            if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop();
            //5. Call the close callback function to avoid removing the connection information managed by the server first, causing the Connection to be released, and then processing it will cause an error. Therefore, call the user's callback function first.
            if (_closed_callback) _closed_callback(shared_from_this());
            //Remove connection information managed internally by the server
            if (_server_closed_callback) _server_closed_callback(shared_from_this());
        }
        //This interface is not the actual sending interface, but just puts the data into the sending buffer and starts writable event monitoring.
        void SendInLoop(Buffer & amp;buf) {<!-- -->
            if (_statu == DISCONNECTED) return ;
            _out_buffer.WriteBufferAndPush(buf);
            if (_channel.WriteAble() == false) {<!-- -->
                _channel.EnableWrite();
            }
        }
        //This closing operation is not an actual connection release operation. It needs to be judged whether there is any data to be processed and sent.
        void ShutdownInLoop() {<!-- -->
            _statu = DISCONNECTING;//Set the connection to a semi-closed state
            if (_in_buffer.ReadAbleSize() > 0) {<!-- -->
                if (_message_callback) _message_callback(shared_from_this(), & amp;_in_buffer);
            }
            //Either there is an error when writing data and it is closed, or there is no data to be sent and it is closed directly.
            if (_out_buffer.ReadAbleSize() > 0) {<!-- -->
                if (_channel.WriteAble() == false) {<!-- -->
                    _channel.EnableWrite();
                }
            }
            if (_out_buffer.ReadAbleSize() == 0) {<!-- -->
                Release();
            }
        }
        //Start the inactive connection timeout release rule
        void EnableInactiveReleaseInLoop(int sec) {<!-- -->
            //1. Set the judgment flag _enable_inactive_release to true
            _enable_inactive_release = true;
            //2. If the current scheduled destruction task already exists, just refresh and delay it.
            if (_loop->HasTimer(_conn_id)) {<!-- -->
                return _loop->TimerRefresh(_conn_id);
            }
            //3. If there is no scheduled destruction task, add it
            _loop->TimerAdd(_conn_id, sec, std::bind( & amp;Connection::Release, this));
        }
        void CancelInactiveReleaseInLoop() {<!-- -->
            _enable_inactive_release = false;
            if (_loop->HasTimer(_conn_id)) {<!-- -->
                _loop->TimerCancel(_conn_id);
            }
        }
        void UpgradeInLoop(const Any & amp;context,
                    const ConnectedCallback &conn,
                    const MessageCallback &msg,
                    const ClosedCallback &closed,
                    const AnyEventCallback & amp;event) {<!-- -->
            _context = context;
            _connected_callback = conn;
            _message_callback = msg;
            _closed_callback = closed;
            _event_callback = event;
        }
    public:
        Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),
            _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),
            _channel(loop, _sockfd) {<!-- -->
            _channel.SetCloseCallback(std::bind( & amp;Connection::HandleClose, this));
            _channel.SetEventCallback(std::bind( & amp;Connection::HandleEvent, this));
            _channel.SetReadCallback(std::bind( & amp;Connection::HandleRead, this));
            _channel.SetWriteCallback(std::bind( & amp;Connection::HandleWrite, this));
            _channel.SetErrorCallback(std::bind( & amp;Connection::HandleError, this));
        }
        ~Connection() {<!-- --> DBG_LOG("RELEASE CONNECTION:%p", this); }
        //Get the managed file descriptor
        int Fd() {<!-- --> return _sockfd; }
        //Get the connection ID
        int Id() {<!-- --> return _conn_id; }
        //Whether it is in CONNECTED state
        bool Connected() {<!-- --> return (_statu == CONNECTED); }
        //Set context--called when the connection is established
        void SetContext(const Any & amp;context) {<!-- --> _context = context; }
        //Get the context and return a pointer
        Any *GetContext() {<!-- --> return & amp;_context; }
        void SetConnectedCallback(const ConnectedCallback & amp;cb) {<!-- --> _connected_callback = cb; }
        void SetMessageCallback(const MessageCallback & amp;cb) {<!-- --> _message_callback = cb; }
        void SetClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _closed_callback = cb; }
        void SetAnyEventCallback(const AnyEventCallback & amp;cb) {<!-- --> _event_callback = cb; }
        void SetSrvClosedCallback(const ClosedCallback & amp;cb) {<!-- --> _server_closed_callback = cb; }
        //After the connection is established, set the channel callback, start read monitoring, and call _connected_callback
        void Established() {<!-- -->
            _loop->RunInLoop(std::bind( & amp;Connection::EstablishedInLoop, this));
        }
        //Send data, put the data in the send buffer, and start write event monitoring
        void Send(const char *data, size_t len) {<!-- -->
            //The data passed in from the outside may be a temporary space. We are just pushing the sending operation into the task pool, and it may not be executed immediately.
            //So it is possible that the space pointed to by data may have been released during execution.
            Buffer buf;
            buf.WriteAndPush(data, len);
            _loop->RunInLoop(std::bind( & amp;Connection::SendInLoop, this, std::move(buf)));
        }
        //The shutdown interface provided to component users--it does not actually close it. It needs to be judged whether there is data to be processed.
        void Shutdown() {<!-- -->
            _loop->RunInLoop(std::bind( & amp;Connection::ShutdownInLoop, this));
        }
        void Release() {<!-- -->
            _loop->QueueInLoop(std::bind( & amp;Connection::ReleaseInLoop, this));
        }
        //Start inactive destruction, and define how long it will take for no communication to become inactive, and add a scheduled task
        void EnableInactiveRelease(int sec) {<!-- -->
            _loop->RunInLoop(std::bind( & amp;Connection::EnableInactiveReleaseInLoop, this, sec));
        }
        //Cancel inactive destruction
        void CancelInactiveRelease() {<!-- -->
            _loop->RunInLoop(std::bind( & amp;Connection::CancelInactiveReleaseInLoop, this));
        }
        //Switch protocol --- reset context and phased callback processing function -- but this interface must be executed immediately in the EventLoop thread
        //Prevent that after a new event is triggered, the switching task has not been executed during processing - which will cause the data to be processed using the original protocol.
        void Upgrade(const Any & amp;context, const ConnectedCallback & amp;conn, const MessageCallback & amp;msg,
                     const ClosedCallback & amp;closed, const AnyEventCallback & amp;event) {<!-- -->
            _loop->AssertInLoop();
            _loop->RunInLoop(std::bind( & amp;Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
        }
};