Muduo library source code analysis (eight) – TcpServer class

TcpServer class

Key points

The main function of the TcpServer class is to manage the entire server and do the following operations:

  • Manage TcpConnection obtained by accept(2)
  • TcpServer is for direct use by users, and its lifetime is controlled by users
  • Set mainLoop, and use mainLoop to manage new connections
  • Initialize the Acceptor corresponding to TcpServer to monitor the arrival of new connections
  • According to the design scheme of one loop per thread, setThreadNum sets the number of threads to create the corresponding eventLoop
  • The user calls TcpServer::start() to make all subLoop start loop (namely epoll_wait())
  • Realize the processing of when the connection arrives
    • Package connfd as TcpConnection
    • Set callbacks (connection arrival, read and write message arrival, write completion)
    • It saves the user-supplied
      ConnectionCallback and MessageCallback, when creating a new TcpConnection
      to the latter
    • Execute the callback for connection establishment completion (register the channel’s epollin event with the poller)
  • TcpConnection removed task processing

Muduo uses a wakeupfd_ on each subloop to make mainloop notify subloop to detect epoll_wait, which is compared to It is more efficient to use a producer-consumer queue (that is, the producer puts tasks in the queue (IO logic processing) and wakes up the producer)

The sequence of function calls when a new connection is established

newconn() stands for TcpServer::newConnection

established() stands for TcpConnection::connectEstablished()

Muduo tries to make dependency one-way, TcpServer will use Acceptor, but Acceptor does not know the existence of TcpServer. TcpServer will create TcpConnection, but TcpConnection does not know the existence of TcpServer.

Key code explanation

// TcpServer.h
#pragma once
// The user uses muduo to write the server program

#include "EventLoop.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "noncopyable.h"
#include "EventLoopThreadPool.h"
#include "TcpConnection.h"
#include "Callbacks.h"

#include <functional>
#include <string>
#include <memory>
#include <atomic>
#include <unordered_map>

// The class used for external server programming
// Set the event callback operation of the connection in this class object
class TcpServer
{<!-- -->
public:
    using ThreadInitCallback = std::function<void(EventLoop*)>;

    enum Option
    {<!-- -->
        kNoReusePort,
        kReusePort,
    };

    TcpServer(EventLoop *loop,
                const InetAddress & listenAddr,
                const std::string & nameArg,
                Option option = kNoReusePort);

    ~TcpServer();

    void setThreadInitCallback(const ThreadInitCallback & amp;cb) {<!-- --> threadInitCallback_ = cb; }
    void setConnectionCallback(const ConnectionCallback & amp;cb) {<!-- --> connectionCallback_ = cb; }
    void setMessageCallback(const MessageCallback & amp;cb) {<!-- --> messageCallback_ = cb; }
    void setWriteCompleteCallback(const WriteCompleteCallback & cb) {<!-- --> writeCompleteCallback_ = cb; }


    // Set the number of underlying subloops
    void setThreadNum(int numThreads);

    // enable server monitoring
    void start();

private:
    void newConnection(int sockfd, const InetAddress & amp;peerAddr);
    void removeConnection(const TcpConnectionPtr & amp;conn); // remove TcpConnection from map
    void removeConnectionInLoop(const TcpConnectionPtr & amp;conn);
    
    using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;

    EventLoop *loop_; // baseLoop
    const std::string ipPort_;
    const std::string name_;
    std::unique_ptr<Acceptor> acceptor_; // Running in mainLoop, the task is to monitor the arrival of new connections
    std::shared_ptr<EventLoopThreadPool> threadPool_;

    ConnectionCallback connectionCallback_; // callback when there is a new connection
    MessageCallback messageCallback_; // There is a callback for reading and writing messages
    WriteCompleteCallback writeCompleteCallback_; // callback after the message is sent

    ThreadInitCallback threadInitCallback_; // loop thread initialization callback

    std::atomic_int started_;

    int nextConnId_;
    ConnectionMap connections_; // save all connections
};

// TcpServer.cc
#include "TcpServer.h"
#include "Logger.h"
#include "TcpConnection.h"

#include <strings.h>
#include <functional>

static EventLoop* CheckLoopNotNull(EventLoop *loop)
{<!-- -->
    if(loop == nullptr)
    {<!-- -->
        LOG_FATAL("%s:%s:%d mainLoop is null! \\
", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

TcpServer::TcpServer(EventLoop *loop,
                const InetAddress & listenAddr,
                const std::string & nameArg,
                Option option)
                :loop_(CheckLoopNotNull(loop))
                , ipPort_(listenAddr.toIpPort())
                , name_(nameArg)
                , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
                , threadPool_(new EventLoopThreadPool(loop, name_))
                , connectionCallback_()
                , messageCallback_()
                , nextConnId_(1)
                , started_(0) // Pay attention to initialization
{<!-- -->
    // When a new user connects, TcpServer::newConnection will be executed
    acceptor_->setNewConnectionCallback(std::bind( & amp;TcpServer::newConnection, this,
        std::placeholders::_1, std::placeholders::_2));
}

TcpServer::~TcpServer()
{<!-- -->
    for(auto & item : connections_)
    {<!-- -->
        // This local shared_ptr smart pointer object can automatically release the new TcpConnection object resource after opening the right bracket
        TcpConnectionPtr conn(item.second); // item.second will not be used after the next line is reset
        item. second. reset();

        // destroy link
        conn->getLoop()->runInLoop(
            std::bind( & TcpConnection::connectDestroyed, conn)
        );
    }
}

void TcpServer::setThreadNum(int numThreads)
{<!-- -->
    threadPool_->setThreadNum(numThreads);
}

// enable server monitoring
void TcpServer::start()
{<!-- -->
    // Prevent a TcpServer object from being started multiple times
    if(started_ + + == 0)
    {<!-- -->
        // start subLoop
        threadPool_->start(threadInitCallback_); // Start the underlying thread pool
        // execute Acceptor::listen
        loop_->runInLoop(std::bind( &Acceptor::listen, acceptor_.get()));
    }
}

// There is a new client connection, Acceptor will execute this callback
void TcpServer::newConnection(int sockfd, const InetAddress & amp;peerAddr)
{<!-- -->
    // round-robin, select a subLoop management channel
    EventLoop *ioLoop = threadPool_->getNextLoop();
    char buf[64] = {<!-- -->0};
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
     + + nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \\
",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());

    // Obtain the ip address and port information of the machine bound to it through sockfd
    sockaddr_in local;
    ::bzero( & amp;local, sizeof local);
    socklen_t addrlen = sizeof local;
    if(::getsockname(sockfd, (sockaddr*) & amp;local, & amp;addrlen) < 0)
    {<!-- -->
        LOG_ERROR("sockets::getLocalAddr");
    }
    InetAddress localAddr(local);

    // Create a TcpConnection connection object based on the successfully connected sockfd
    TcpConnectionPtr conn(new TcpConnection(
                            ioLoop,
                            connName,
                            sockfd,
                            localAddr,
                            peerAddr));
    connections_[connName] = conn;

    // The following callbacks are set by the user to TcpServer -> TcpConnection ->Channel ->Poller -> notify Channel to execute the callback
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);

    // Set the callback for how to close the connection conn->handleClose()
    conn->setCloseCallback(
        std::bind( &TcpServer::removeConnection, this, std::placeholders::_1)
    );

    // Call TcpConnection::connectEstablished directly
    ioLoop->runInLoop(std::bind( & amp;TcpConnection::connectEstablished, conn));
}

void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{<!-- -->
    loop_->runInLoop(
        std::bind( &TcpServer::removeConnectionInLoop, this, conn)
    );
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr & amp;conn)
{<!-- -->
    LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\\
",
        name_.c_str(), conn->name().c_str());

    connections_.erase(conn->name());
    EventLoop *ioLoop = conn->getLoop();
    ioLoop->queueInLoop(
        std::bind( & TcpConnection::connectDestroyed, conn)
    );
}