TcpServer class
Key points
The main function of the TcpServer class is to manage the entire server and do the following operations:
- Manage
TcpConnection
obtained byaccept(2)
- TcpServer is for direct use by users, and its lifetime is controlled by users
- Set
mainLoop
, and use mainLoop to manage new connections - Initialize the
Acceptor
corresponding to TcpServer to monitor the arrival of new connections - According to the design scheme of
one loop per thread
,setThreadNum
sets the number of threads to create the correspondingeventLoop
- The user calls
TcpServer::start()
to make allsubLoop
startloop
(namelyepoll_wait()
) - Realize the processing of when the connection arrives
- Package
connfd
asTcpConnection
- Set callbacks (connection arrival, read and write message arrival, write completion)
- It saves the user-supplied
ConnectionCallback and MessageCallback, when creating a new TcpConnection
to the latter - Execute the callback for connection establishment completion (register the channel’s epollin event with the poller)
- Package
TcpConnection
removed task processing
Muduo uses a wakeupfd_
on each subloop to make mainloop
notify subloop
to detect epoll_wait
, which is compared to It is more efficient to use a producer-consumer queue (that is, the producer puts tasks in the queue (IO logic processing) and wakes up the producer)
The sequence of function calls when a new connection is established
newconn()
stands for TcpServer::newConnection
established()
stands for TcpConnection::connectEstablished()
Muduo tries to make dependency one-way, TcpServer will use Acceptor, but Acceptor does not know the existence of TcpServer. TcpServer will create TcpConnection, but TcpConnection does not know the existence of TcpServer.
Key code explanation
// TcpServer.h #pragma once // The user uses muduo to write the server program #include "EventLoop.h" #include "Acceptor.h" #include "InetAddress.h" #include "noncopyable.h" #include "EventLoopThreadPool.h" #include "TcpConnection.h" #include "Callbacks.h" #include <functional> #include <string> #include <memory> #include <atomic> #include <unordered_map> // The class used for external server programming // Set the event callback operation of the connection in this class object class TcpServer {<!-- --> public: using ThreadInitCallback = std::function<void(EventLoop*)>; enum Option {<!-- --> kNoReusePort, kReusePort, }; TcpServer(EventLoop *loop, const InetAddress & listenAddr, const std::string & nameArg, Option option = kNoReusePort); ~TcpServer(); void setThreadInitCallback(const ThreadInitCallback & amp;cb) {<!-- --> threadInitCallback_ = cb; } void setConnectionCallback(const ConnectionCallback & amp;cb) {<!-- --> connectionCallback_ = cb; } void setMessageCallback(const MessageCallback & amp;cb) {<!-- --> messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback & cb) {<!-- --> writeCompleteCallback_ = cb; } // Set the number of underlying subloops void setThreadNum(int numThreads); // enable server monitoring void start(); private: void newConnection(int sockfd, const InetAddress & amp;peerAddr); void removeConnection(const TcpConnectionPtr & amp;conn); // remove TcpConnection from map void removeConnectionInLoop(const TcpConnectionPtr & amp;conn); using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>; EventLoop *loop_; // baseLoop const std::string ipPort_; const std::string name_; std::unique_ptr<Acceptor> acceptor_; // Running in mainLoop, the task is to monitor the arrival of new connections std::shared_ptr<EventLoopThreadPool> threadPool_; ConnectionCallback connectionCallback_; // callback when there is a new connection MessageCallback messageCallback_; // There is a callback for reading and writing messages WriteCompleteCallback writeCompleteCallback_; // callback after the message is sent ThreadInitCallback threadInitCallback_; // loop thread initialization callback std::atomic_int started_; int nextConnId_; ConnectionMap connections_; // save all connections }; // TcpServer.cc #include "TcpServer.h" #include "Logger.h" #include "TcpConnection.h" #include <strings.h> #include <functional> static EventLoop* CheckLoopNotNull(EventLoop *loop) {<!-- --> if(loop == nullptr) {<!-- --> LOG_FATAL("%s:%s:%d mainLoop is null! \\ ", __FILE__, __FUNCTION__, __LINE__); } return loop; } TcpServer::TcpServer(EventLoop *loop, const InetAddress & listenAddr, const std::string & nameArg, Option option) :loop_(CheckLoopNotNull(loop)) , ipPort_(listenAddr.toIpPort()) , name_(nameArg) , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)) , threadPool_(new EventLoopThreadPool(loop, name_)) , connectionCallback_() , messageCallback_() , nextConnId_(1) , started_(0) // Pay attention to initialization {<!-- --> // When a new user connects, TcpServer::newConnection will be executed acceptor_->setNewConnectionCallback(std::bind( & amp;TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); } TcpServer::~TcpServer() {<!-- --> for(auto & item : connections_) {<!-- --> // This local shared_ptr smart pointer object can automatically release the new TcpConnection object resource after opening the right bracket TcpConnectionPtr conn(item.second); // item.second will not be used after the next line is reset item. second. reset(); // destroy link conn->getLoop()->runInLoop( std::bind( & TcpConnection::connectDestroyed, conn) ); } } void TcpServer::setThreadNum(int numThreads) {<!-- --> threadPool_->setThreadNum(numThreads); } // enable server monitoring void TcpServer::start() {<!-- --> // Prevent a TcpServer object from being started multiple times if(started_ + + == 0) {<!-- --> // start subLoop threadPool_->start(threadInitCallback_); // Start the underlying thread pool // execute Acceptor::listen loop_->runInLoop(std::bind( &Acceptor::listen, acceptor_.get())); } } // There is a new client connection, Acceptor will execute this callback void TcpServer::newConnection(int sockfd, const InetAddress & amp;peerAddr) {<!-- --> // round-robin, select a subLoop management channel EventLoop *ioLoop = threadPool_->getNextLoop(); char buf[64] = {<!-- -->0}; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); + + nextConnId_; std::string connName = name_ + buf; LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \\ ", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()); // Obtain the ip address and port information of the machine bound to it through sockfd sockaddr_in local; ::bzero( & amp;local, sizeof local); socklen_t addrlen = sizeof local; if(::getsockname(sockfd, (sockaddr*) & amp;local, & amp;addrlen) < 0) {<!-- --> LOG_ERROR("sockets::getLocalAddr"); } InetAddress localAddr(local); // Create a TcpConnection connection object based on the successfully connected sockfd TcpConnectionPtr conn(new TcpConnection( ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; // The following callbacks are set by the user to TcpServer -> TcpConnection ->Channel ->Poller -> notify Channel to execute the callback conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); // Set the callback for how to close the connection conn->handleClose() conn->setCloseCallback( std::bind( &TcpServer::removeConnection, this, std::placeholders::_1) ); // Call TcpConnection::connectEstablished directly ioLoop->runInLoop(std::bind( & amp;TcpConnection::connectEstablished, conn)); } void TcpServer::removeConnection(const TcpConnectionPtr &conn) {<!-- --> loop_->runInLoop( std::bind( &TcpServer::removeConnectionInLoop, this, conn) ); } void TcpServer::removeConnectionInLoop(const TcpConnectionPtr & amp;conn) {<!-- --> LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\\ ", name_.c_str(), conn->name().c_str()); connections_.erase(conn->name()); EventLoop *ioLoop = conn->getLoop(); ioLoop->queueInLoop( std::bind( & TcpConnection::connectDestroyed, conn) ); }