Muduo source code analysis–TcpServer

TcpServer class:

Tcp server class

// Class used for external server programming
class TcpServer
{<!-- -->
public:
    using ThreadInitCallback = std::function<void(EventLoop *)>;

    enum Option
    {<!-- -->
        kNoReusePort,
        kReusePort,
    };

    TcpServer(EventLoop *loop,
              const InetAddress & listenAddr,
              const std::string & nameArg,
              Option option = kNoReusePort);
    ~TcpServer();

    void setThreadInitCallback(const ThreadInitCallback & amp;cb) {<!-- --> threadInitCallback_ = cb; }
    void setConnectionCallback(const ConnectionCallback & amp;cb) {<!-- --> connectionCallback_ = cb; }
    void setMessageCallback(const MessageCallback & amp;cb) {<!-- --> messageCallback_ = cb; }
    void setWriteCompleteCallback(const WriteCompleteCallback & cb) {<!-- --> writeCompleteCallback_ = cb; }

    // Set the number of underlying subloops
    void setThreadNum(int numThreads);

    // enable server monitoring
    void start();

private:
    void newConnection(int sockfd, const InetAddress & amp;peerAddr);
    void removeConnection(const TcpConnectionPtr & amp;conn);
    void removeConnectionInLoop(const TcpConnectionPtr & amp;conn);

    using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;

    EventLoop *loop_; // baseloop user-defined loop

    const std::string ipPort_;
    const std::string name_;

    std::unique_ptr<Acceptor> acceptor_; // The task running in the mainloop is to listen for new connection events

    std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per thread

    ConnectionCallback connectionCallback_; //Callback when there is a new connection
    MessageCallback messageCallback_; // Callback when a read or write event occurs
    WriteCompleteCallback writeCompleteCallback_; // callback after the message is sent

    ThreadInitCallback threadInitCallback_; // loop thread initialization callback

    std::atomic_int started_;

    int nextConnId_;
    ConnectionMap connections_; // save all connections
};

Constructor implementation:
Initialize the main loop, accptor object, thread pool object threadPool, listening address, and finally set the necessary callback function

static EventLoop *CheckLoopNotNull(EventLoop *loop)
{<!-- -->
    if (loop == nullptr)
    {<!-- -->
        LOG_FATAL("%s:%s:%d mainLoop is null!\\
", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

TcpServer::TcpServer(EventLoop *loop,
                     const InetAddress & listenAddr,
                     const std::string &nameArg,
                     Option option)
    : loop_(CheckLoopNotNull(loop))
    , ipPort_(listenAddr.toIpPort())
    , name_(nameArg)
    , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
    , threadPool_(new EventLoopThreadPool(loop, name_))
    , connectionCallback_()
    , messageCallback_()
    , nextConnId_(1)
    , started_(0)
{<!-- -->
    // When a new user connects, the acceptChannel_ bound in the Acceptor class will have a read event, execute handleRead() to call TcpServer::newConnection callback
    acceptor_->setNewConnectionCallback(
        std::bind( & TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}

Set the number of threads setThreadNum () method, open the server to monitor through the start () method, mainly to open the thread pool and execute the callback of monitoring

// Turn on server monitoring
// Set the number of underlying subloops
void TcpServer::setThreadNum(int numThreads)
{<!-- -->
    threadPool_->setThreadNum(numThreads);
}

void TcpServer::start()
{<!-- -->
    if (started_ + + == 0) // prevent a TcpServer object from being started multiple times
    {<!-- -->
        threadPool_->start(threadInitCallback_); // Start the underlying loop thread pool
        loop_->runInLoop(std::bind( &Acceptor::listen, acceptor_.get()));
    }
}

The new connection callback onNewconnection is mainly called by the socket read callback bound to the acceptor object (handleRead method)
Execution process: After a new connection arrives, a sub-loop object is obtained through the polling algorithm, and then a TcpConnection object is created by passing in the sub-loop and socketfd as well as the addresses of the peer end and the local end, and put into the connection map, and then set the corresponding upper layer callback

// There is a new user connection, the acceptor will execute this callback operation, responsible for the request connection received by mainLoop (acceptChannel_ will have a read event) distributed to subLoop for processing through callback polling
void TcpServer::newConnection(int sockfd, const InetAddress & amp;peerAddr)
{<!-- -->
    // polling algorithm select a subLoop to manage the channel corresponding to connfd
    EventLoop *ioLoop = threadPool_->getNextLoop();
    char buf[64] = {<!-- -->0};
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
     + + nextConnId_; // This is not set as an atomic class because it is only executed in the mainloop and does not involve thread safety issues
    std::string connName = name_ + buf;

    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s\\
",
             name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
    
    // Obtain the ip address and port information of the machine bound to it through sockfd
    sockaddr_in local;
    ::memset( & amp;local, 0, sizeof(local));
    socklen_t addrlen = sizeof(local);
    if(::getsockname(sockfd, (sockaddr *) &local, &addrlen) < 0)
    {<!-- -->
        LOG_ERROR("sockets::getLocalAddr");
    }

    InetAddress localAddr(local);
    TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                            connName,
                                            sockfd,
                                            localAddr,
                                            peerAddr));
    connections_[connName] = conn;
    // The following callbacks are all set by the user to TcpServer => TcpConnection. As for the Channel binding, it is the four set by TcpConnection, handleRead, handleWrite... The following callbacks are used in the handlexxx function
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);

    // set the callback for how to close the connection
    conn->setCloseCallback(
        std::bind( &TcpServer::removeConnection, this, std::placeholders::_1));

//run the connection establishment callback
    ioLoop->runInLoop(
        std::bind( & TcpConnection::connectEstablished, conn));
}

Disconnect callback implementation, disconnect

void TcpServer::removeConnection(const TcpConnectionPtr & amp;conn)
{<!-- -->
    loop_->runInLoop(
        std::bind( &TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr & amp;conn)
{<!-- -->
    LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\\
",
             name_.c_str(), conn->name().c_str());

    connections_.erase(conn->name());
    EventLoop *ioLoop = conn->getLoop();
    ioLoop->queueInLoop(
        std::bind( &TcpConnection::connectDestroyed, conn));
}

Release operation:

TcpServer::~TcpServer()
{<!-- -->
    for(auto & item : connections_)
    {<!-- -->
        TcpConnectionPtr conn(item. second);
        item.second.reset(); // Reset the original smart pointer and let the TcpConnectionPtr conn in the stack space point to this object. When conn goes out of its scope, the object pointed to by the smart pointer can be released
        // destroy the connection
        conn->getLoop()->runInLoop(
            std::bind( & TcpConnection::connectDestroyed, conn));
    }
}