TcpServer class:
Tcp server class
// Class used for external server programming class TcpServer {<!-- --> public: using ThreadInitCallback = std::function<void(EventLoop *)>; enum Option {<!-- --> kNoReusePort, kReusePort, }; TcpServer(EventLoop *loop, const InetAddress & listenAddr, const std::string & nameArg, Option option = kNoReusePort); ~TcpServer(); void setThreadInitCallback(const ThreadInitCallback & amp;cb) {<!-- --> threadInitCallback_ = cb; } void setConnectionCallback(const ConnectionCallback & amp;cb) {<!-- --> connectionCallback_ = cb; } void setMessageCallback(const MessageCallback & amp;cb) {<!-- --> messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback & cb) {<!-- --> writeCompleteCallback_ = cb; } // Set the number of underlying subloops void setThreadNum(int numThreads); // enable server monitoring void start(); private: void newConnection(int sockfd, const InetAddress & amp;peerAddr); void removeConnection(const TcpConnectionPtr & amp;conn); void removeConnectionInLoop(const TcpConnectionPtr & amp;conn); using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>; EventLoop *loop_; // baseloop user-defined loop const std::string ipPort_; const std::string name_; std::unique_ptr<Acceptor> acceptor_; // The task running in the mainloop is to listen for new connection events std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per thread ConnectionCallback connectionCallback_; //Callback when there is a new connection MessageCallback messageCallback_; // Callback when a read or write event occurs WriteCompleteCallback writeCompleteCallback_; // callback after the message is sent ThreadInitCallback threadInitCallback_; // loop thread initialization callback std::atomic_int started_; int nextConnId_; ConnectionMap connections_; // save all connections };
Constructor implementation:
Initialize the main loop, accptor object, thread pool object threadPool, listening address, and finally set the necessary callback function
static EventLoop *CheckLoopNotNull(EventLoop *loop) {<!-- --> if (loop == nullptr) {<!-- --> LOG_FATAL("%s:%s:%d mainLoop is null!\\ ", __FILE__, __FUNCTION__, __LINE__); } return loop; } TcpServer::TcpServer(EventLoop *loop, const InetAddress & listenAddr, const std::string &nameArg, Option option) : loop_(CheckLoopNotNull(loop)) , ipPort_(listenAddr.toIpPort()) , name_(nameArg) , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)) , threadPool_(new EventLoopThreadPool(loop, name_)) , connectionCallback_() , messageCallback_() , nextConnId_(1) , started_(0) {<!-- --> // When a new user connects, the acceptChannel_ bound in the Acceptor class will have a read event, execute handleRead() to call TcpServer::newConnection callback acceptor_->setNewConnectionCallback( std::bind( & TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); }
Set the number of threads setThreadNum () method, open the server to monitor through the start () method, mainly to open the thread pool and execute the callback of monitoring
// Turn on server monitoring // Set the number of underlying subloops void TcpServer::setThreadNum(int numThreads) {<!-- --> threadPool_->setThreadNum(numThreads); } void TcpServer::start() {<!-- --> if (started_ + + == 0) // prevent a TcpServer object from being started multiple times {<!-- --> threadPool_->start(threadInitCallback_); // Start the underlying loop thread pool loop_->runInLoop(std::bind( &Acceptor::listen, acceptor_.get())); } }
The new connection callback onNewconnection is mainly called by the socket read callback bound to the acceptor object (handleRead method)
Execution process: After a new connection arrives, a sub-loop object is obtained through the polling algorithm, and then a TcpConnection object is created by passing in the sub-loop and socketfd as well as the addresses of the peer end and the local end, and put into the connection map, and then set the corresponding upper layer callback
// There is a new user connection, the acceptor will execute this callback operation, responsible for the request connection received by mainLoop (acceptChannel_ will have a read event) distributed to subLoop for processing through callback polling void TcpServer::newConnection(int sockfd, const InetAddress & amp;peerAddr) {<!-- --> // polling algorithm select a subLoop to manage the channel corresponding to connfd EventLoop *ioLoop = threadPool_->getNextLoop(); char buf[64] = {<!-- -->0}; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); + + nextConnId_; // This is not set as an atomic class because it is only executed in the mainloop and does not involve thread safety issues std::string connName = name_ + buf; LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s\\ ", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()); // Obtain the ip address and port information of the machine bound to it through sockfd sockaddr_in local; ::memset( & amp;local, 0, sizeof(local)); socklen_t addrlen = sizeof(local); if(::getsockname(sockfd, (sockaddr *) &local, &addrlen) < 0) {<!-- --> LOG_ERROR("sockets::getLocalAddr"); } InetAddress localAddr(local); TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; // The following callbacks are all set by the user to TcpServer => TcpConnection. As for the Channel binding, it is the four set by TcpConnection, handleRead, handleWrite... The following callbacks are used in the handlexxx function conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); // set the callback for how to close the connection conn->setCloseCallback( std::bind( &TcpServer::removeConnection, this, std::placeholders::_1)); //run the connection establishment callback ioLoop->runInLoop( std::bind( & TcpConnection::connectEstablished, conn)); }
Disconnect callback implementation, disconnect
void TcpServer::removeConnection(const TcpConnectionPtr & amp;conn) {<!-- --> loop_->runInLoop( std::bind( &TcpServer::removeConnectionInLoop, this, conn)); } void TcpServer::removeConnectionInLoop(const TcpConnectionPtr & amp;conn) {<!-- --> LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\\ ", name_.c_str(), conn->name().c_str()); connections_.erase(conn->name()); EventLoop *ioLoop = conn->getLoop(); ioLoop->queueInLoop( std::bind( &TcpConnection::connectDestroyed, conn)); }
Release operation:
TcpServer::~TcpServer() {<!-- --> for(auto & item : connections_) {<!-- --> TcpConnectionPtr conn(item. second); item.second.reset(); // Reset the original smart pointer and let the TcpConnectionPtr conn in the stack space point to this object. When conn goes out of its scope, the object pointed to by the smart pointer can be released // destroy the connection conn->getLoop()->runInLoop( std::bind( & TcpConnection::connectDestroyed, conn)); } }