Introduction
Poller is an abstract virtual base class of the I/O multiplexing interface, which encapsulates the I/O multiplexing API. Muduo provides EPollPoller and PollPoller derived classes (epoll and poll), so select is not supported.
newDefaultPoller() selects epoll by default
Main interface
poll
It is the core function of Poller. Use poll or epoll_wait of derived classes to block and wait for IO events to occur.
Populate the EventLoop’s activeChannelList_ with the derived class’s implementation
static createNewPoller:
Factory function to create a Poller instance
In EpollPoller, each instance corresponds to an epollfd
update
Update the status of I/O multiplexing, such as ADD, MOD, DEL of epoll_ctl
Main members
loop
Control the EventLoop pointer of the current Poller
The remaining members are implemented by derived classes
Source code analysis
poller.h
#ifndef MUDUO_NET_POLLER_H #define MUDUO_NET_POLLER_H #include <map> #include <vector> #include "muduo/base/Timestamp.h" #include "muduo/net/EventLoop.h" namespace muduo {<!-- --> namespace net {<!-- --> class Channel; /// /// Base class for IO Multiplexing /// /// This class doesn't own the Channel objects. class Poller: noncopyable {<!-- --> public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); virtual ~Poller(); /// Polls the I/O events. /// Must be called in the loop thread. virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0; /// Changes the interested I/O events. /// Must be called in the loop thread. virtual void updateChannel(Channel* channel) = 0; /// Remove the channel, when it destructs. /// Must be called in the loop thread. virtual void removeChannel(Channel* channel) = 0; //Determine whether it exists virtual bool hasChannel(Channel* channel) const; //Create a poller, the default is epoll static Poller* newDefaultPoller(EventLoop* loop); void assertInLoopThread() const {<!-- --> ownerLoop_->assertInLoopThread(); } protected: typedef std::map<int, Channel*> ChannelMap; ChannelMap channels_; private: EventLoop* ownerLoop_; }; } // namespace net } // namespace muduo #endif //MUDUO_NET_POLLER_H
poller.cc
#include "muduo/net/Poller.h" #include "muduo/net/Channel.h" using namespace muduo; using namespace muduo::net; Poller::Poller(EventLoop* loop) : ownerLoop_(loop) {<!-- --> } Poller::~Poller() = default; bool Poller::hasChannel(Channel* channel) const {<!-- --> assertInLoopThread(); ChannelMap::const_iterator it = channels_.find(channel->fd()); return it != channels_.end() & amp; & amp; it->second == channel; }
EPollPoller.h
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H #define MUDUO_NET_POLLER_EPOLLPOLLER_H #include "muduo/net/Poller.h" #include <vector> struct epoll_event; namespace muduo {<!-- --> namespace net {<!-- --> /// /// IO Multiplexing with epoll(4). /// class EPollPoller : public Poller {<!-- --> public: EPollPoller(EventLoop* loop); ~EPollPoller() override; Timestamp poll(int timeoutMs, ChannelList* activeChannels) override; void updateChannel(Channel* channel) override; void removeChannel(Channel* channel) override; private: static const int kInitEventListSize = 16; static const char* operationToString(int op); void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; void update(int operation, Channel* channel); typedef std::vector<struct epoll_event> EventList; int epollfd_; EventList events_; }; } // namespace net } // namespace muduo #endif // MUDUO_NET_POLLER_EPOLLPOLLER_H
EPollPoller.cc
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include "muduo/net/poller/EPollPoller.h" #include "muduo/base/Logging.h" #include "muduo/net/Channel.h" #include <assert.h> #include <errno.h> #include <poll.h> #include <sys/epoll.h> #include <unistd.h> using namespace muduo; using namespace muduo::net; /*struct epoll_event { uint32_t events; //Epoll events epoll_data_t data; //User data variable } __attribute__ ((__packed__)); typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;*/ // On Linux, the constants of poll(2) and epoll(4) // are expected to be the same. static_assert(EPOLLIN == POLLIN, "epoll uses same flag values as poll"); static_assert(EPOLLPRI == POLLPRI, "epoll uses same flag values as poll"); static_assert(EPOLLOUT == POLLOUT, "epoll uses same flag values as poll"); static_assert(EPOLLRDHUP == POLLRDHUP, "epoll uses same flag values as poll"); static_assert(EPOLLERR == POLLERR, "epoll uses same flag values as poll"); static_assert(EPOLLHUP == POLLHUP, "epoll uses same flag values as poll"); namespace {<!-- --> const int kNew = -1; //channel has not been added to poller yet const int kAdded = 1; //already added const int kDeleted = 2; //It was monitored before, but later removed. } //When flag = EPOLL_CLOEXEC, the created epfd will set FD_CLOEXEC //FD_CLOEXEC means that when the program executes the exec function, the fd will be automatically closed by the system, indicating that it will not be passed to the new process created by exec. EPollPoller::EPollPoller(EventLoop* loop) : Poller(loop), //Create epollfd, use the version with 1 //If the parameter is 0, it is the same as the epoll_create version and is set to O_CLOEXEC. See the explanation of this parameter of the open function. //The child process will close this fd when it forks and calls exec epollfd_(::epoll_create1(EPOLL_CLOEXEC)), events_(kInitEventListSize) //vector is used to initialize kInitEventListSize size space, the default is 16 {<!-- --> if (epollfd_ < 0) //Judge in the constructor, if <0, abort() {<!-- --> LOG_SYSFATAL << "EPollPoller::EPollPoller"; } } EPollPoller::~EPollPoller() {<!-- --> ::close(epollfd_); } Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)//ChannelList is a vector that stores channels {<!-- --> LOG_TRACE << "fd total count " << channels_.size(); int numEvents = ::epoll_wait(epollfd_, & amp;*events_.begin(), //events_ has been initialized and is the vector that stores epoll_event static_cast<int>(events_.size()), //Monitor the number of sockets timeoutMs); int savedErrno = errno; Timestamp now(Timestamp::now()); if (numEvents > 0) {<!-- --> LOG_TRACE << numEvents << " events happened"; fillActiveChannels(numEvents, activeChannels); if (implicit_cast<size_t>(numEvents) == events_.size()) //If the number of events returned is equal to the current event array size, allocate 2 times the space {<!-- --> events_.resize(events_.size()*2); } } else if (numEvents == 0) {<!-- --> LOG_TRACE << "nothing happened"; } else {<!-- --> // error happens, log uncommon ones if (savedErrno != EINTR) {<!-- --> errno = savedErrno; LOG_SYSERR << "EPollPoller::poll()"; } } return now; } //Add so many events returned to activeChannels void EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {<!-- --> assert(implicit_cast<size_t>(numEvents) <= events_.size()); for (int i = 0; i < numEvents; + + i) //Make sure its size is smaller than the size of events_, because events_ is a reserved event vector {<!-- --> Channel* channel = static_cast<Channel*>(events_[i].data.ptr); #ifndef NDEBUG int fd = channel->fd(); //Do some testing when debugging ChannelMap::const_iterator it = channels_.find(fd); assert(it != channels_.end()); assert(it->second == channel); #endif channel->set_revents(events_[i].events); //Transmit the events that have occurred to the channel and write them to the channel activeChannels->push_back(channel); //and push_back into activeChannels } } //This function is called because channel->enablereading() is called, then channel->update() is called, then event_loop->updateChannel(), then ->epoll or poll's updateChannel is called // void EPollPoller::updateChannel(Channel* channel) {<!-- --> Poller::assertInLoopThread(); //In the IO thread const int index = channel->index(); //The initial state index is -1 LOG_INFO << "fd = " << channel->fd() << " events = " << channel->events() << " index = " << index; // When it is new or it was monitored before and then the monitoring was removed. //The difference between the two is that the new channel has not been saved in epoll before // del was previously saved in channels_, but was not placed in epoll_ctl for monitoring if (index == kNew || index == kDeleted) //index is the subscript in poll, and there are three states in epoll. There are three constants above {<!-- --> // a new one, add with EPOLL_CTL_ADD int fd = channel->fd(); if (index == kNew) {<!-- --> assert(channels_.find(fd) == channels_.end()); //channels_ is a Map channels_[fd] = channel; } else // index == kDeleted {<!-- --> assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); } channel->set_index(kAdded); update(EPOLL_CTL_ADD, channel); //Register event } else {<!-- --> // update existing one with EPOLL_CTL_MOD/DEL int fd = channel->fd(); (void)fd; assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); assert(index == kAdded); // Now that it has been added, the possible modification is to modify the listening time, or stop listening. // So here we first determine whether it is an unmonitored event. If so, remove it directly. if (channel->isNoneEvent()) //Judge that there is no event {<!-- --> update(EPOLL_CTL_DEL, channel); //Delete event channel->set_index(kDeleted); //set to kDeleted after deletion } else {<!-- --> update(EPOLL_CTL_MOD, channel); //Modify registered listening events } } } void EPollPoller::removeChannel(Channel* channel) {<!-- --> Poller::assertInLoopThread(); //Determine whether it is in the IO thread int fd = channel->fd(); LOG_TRACE << "fd = " << fd; assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); assert(channel->isNoneEvent()); int index = channel->index(); assert(index == kAdded || index == kDeleted); size_t n = channels_.erase(fd); //Delete (void)n; assert(n == 1); if (index == kAdded) {<!-- --> update(EPOLL_CTL_DEL, channel); } channel->set_index(kNew); } void EPollPoller::update(int operation, Channel* channel) {<!-- --> printf("-------%s,line.%d-------\ ",__FUNCTION__,__LINE__); struct epoll_event event; //Structure to store data memZero( & amp;event, sizeof event); event.events = channel->events(); //Registered events event.data.ptr = channel; int fd = channel->fd(); LOG_INFO << "epoll_ctl op = " << operationToString(operation) << " fd = " << fd << " event = { " << channel->eventsToString() << " }"; if (::epoll_ctl(epollfd_, operation, fd, & amp;event) < 0)//epoll_ctl returns -1 if it fails {<!-- --> if (operation == EPOLL_CTL_DEL) {<!-- --> LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; } else {<!-- --> LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; } } } const char* EPollPoller::operationToString(int op) {<!-- --> switch(op) {<!-- --> case EPOLL_CTL_ADD: return "ADD"; case EPOLL_CTL_DEL: return "DEL"; case EPOLL_CTL_MOD: return "MOD"; default: assert(false & amp; & amp; "ERROR op"); return "Unknown Operation"; } }