muduo source code analysis of poller/EpollPoller multiplexing class

Introduction

Poller is an abstract virtual base class of the I/O multiplexing interface, which encapsulates the I/O multiplexing API. Muduo provides EPollPoller and PollPoller derived classes (epoll and poll), so select is not supported.

newDefaultPoller() selects epoll by default

Main interface

poll

It is the core function of Poller. Use poll or epoll_wait of derived classes to block and wait for IO events to occur.
Populate the EventLoop’s activeChannelList_ with the derived class’s implementation

static createNewPoller:

Factory function to create a Poller instance
In EpollPoller, each instance corresponds to an epollfd

update

Update the status of I/O multiplexing, such as ADD, MOD, DEL of epoll_ctl

Main members

loop

Control the EventLoop pointer of the current Poller
The remaining members are implemented by derived classes

Source code analysis

poller.h

#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H

#include <map>
#include <vector>

#include "muduo/base/Timestamp.h"
#include "muduo/net/EventLoop.h"

namespace muduo
{<!-- -->
namespace net
{<!-- -->

class Channel;

///
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller: noncopyable
{<!-- -->
 public:
  typedef std::vector<Channel*> ChannelList;

  Poller(EventLoop* loop);
  virtual ~Poller();

  /// Polls the I/O events.
  /// Must be called in the loop thread.
  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

  /// Changes the interested I/O events.
  /// Must be called in the loop thread.
  virtual void updateChannel(Channel* channel) = 0;

  /// Remove the channel, when it destructs.
  /// Must be called in the loop thread.
  virtual void removeChannel(Channel* channel) = 0;

  //Determine whether it exists
  virtual bool hasChannel(Channel* channel) const;

  //Create a poller, the default is epoll
  static Poller* newDefaultPoller(EventLoop* loop);

  void assertInLoopThread() const
  {<!-- -->
    ownerLoop_->assertInLoopThread();
  }

 protected:
  typedef std::map<int, Channel*> ChannelMap;
  ChannelMap channels_;

 private:
  EventLoop* ownerLoop_;
};

} // namespace net
} // namespace muduo

#endif //MUDUO_NET_POLLER_H

poller.cc

#include "muduo/net/Poller.h"
#include "muduo/net/Channel.h"

using namespace muduo;
using namespace muduo::net;

Poller::Poller(EventLoop* loop)
  : ownerLoop_(loop)
{<!-- -->
}

Poller::~Poller() = default;

bool Poller::hasChannel(Channel* channel) const
{<!-- -->
  assertInLoopThread();
  ChannelMap::const_iterator it = channels_.find(channel->fd());
  return it != channels_.end() & amp; & amp; it->second == channel;
}

EPollPoller.h

#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H

#include "muduo/net/Poller.h"

#include <vector>

struct epoll_event;

namespace muduo
{<!-- -->
namespace net
{<!-- -->

///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{<!-- -->
 public:
  EPollPoller(EventLoop* loop);
  ~EPollPoller() override;

  Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
  void updateChannel(Channel* channel) override;
  void removeChannel(Channel* channel) override;

 private:
  static const int kInitEventListSize = 16;

  static const char* operationToString(int op);

  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;
  void update(int operation, Channel* channel);

  typedef std::vector<struct epoll_event> EventList;

  int epollfd_;
  EventList events_;
};

} // namespace net
} // namespace muduo
#endif // MUDUO_NET_POLLER_EPOLLPOLLER_H

EPollPoller.cc

// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include "muduo/net/poller/EPollPoller.h"
 
#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"
 
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
#include <unistd.h>
 
using namespace muduo;
using namespace muduo::net;
 
 
/*struct epoll_event
{
  uint32_t events; //Epoll events
  epoll_data_t data; //User data variable
} __attribute__ ((__packed__));
 
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;*/
 
 
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN, "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI, "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT, "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP, "epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR, "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP, "epoll uses same flag values as poll");
 
namespace
{<!-- -->
const int kNew = -1; //channel has not been added to poller yet
const int kAdded = 1; //already added
const int kDeleted = 2; //It was monitored before, but later removed.
}
 
//When flag = EPOLL_CLOEXEC, the created epfd will set FD_CLOEXEC
//FD_CLOEXEC means that when the program executes the exec function, the fd will be automatically closed by the system, indicating that it will not be passed to the new process created by exec.
EPollPoller::EPollPoller(EventLoop* loop)
  : Poller(loop),
//Create epollfd, use the version with 1
//If the parameter is 0, it is the same as the epoll_create version and is set to O_CLOEXEC. See the explanation of this parameter of the open function.
//The child process will close this fd when it forks and calls exec
    epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
    events_(kInitEventListSize) //vector is used to initialize kInitEventListSize size space, the default is 16
{<!-- -->
  if (epollfd_ < 0) //Judge in the constructor, if <0, abort()
  {<!-- -->
    LOG_SYSFATAL << "EPollPoller::EPollPoller";
  }
}
 
EPollPoller::~EPollPoller()
{<!-- -->
  ::close(epollfd_);
}
 
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)//ChannelList is a vector that stores channels
{<!-- -->
  LOG_TRACE << "fd total count " << channels_.size();
  int numEvents = ::epoll_wait(epollfd_,
                                & amp;*events_.begin(), //events_ has been initialized and is the vector that stores epoll_event
                               static_cast<int>(events_.size()), //Monitor the number of sockets
                               timeoutMs);
  int savedErrno = errno;
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {<!-- -->
    LOG_TRACE << numEvents << " events happened";
    fillActiveChannels(numEvents, activeChannels);
    if (implicit_cast<size_t>(numEvents) == events_.size()) //If the number of events returned is equal to the current event array size, allocate 2 times the space
    {<!-- -->
      events_.resize(events_.size()*2);
    }
  }
  else if (numEvents == 0)
  {<!-- -->
    LOG_TRACE << "nothing happened";
  }
  else
  {<!-- -->
    // error happens, log uncommon ones
    if (savedErrno != EINTR)
    {<!-- -->
      errno = savedErrno;
      LOG_SYSERR << "EPollPoller::poll()";
    }
  }
  return now;
}
 
//Add so many events returned to activeChannels
void EPollPoller::fillActiveChannels(int numEvents,
                                     ChannelList* activeChannels) const
{<!-- -->
  assert(implicit_cast<size_t>(numEvents) <= events_.size());
  for (int i = 0; i < numEvents; + + i) //Make sure its size is smaller than the size of events_, because events_ is a reserved event vector
  {<!-- -->
    Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
    int fd = channel->fd(); //Do some testing when debugging
    ChannelMap::const_iterator it = channels_.find(fd);
    assert(it != channels_.end());
    assert(it->second == channel);
#endif
    channel->set_revents(events_[i].events); //Transmit the events that have occurred to the channel and write them to the channel
    activeChannels->push_back(channel); //and push_back into activeChannels
  }
}
 
//This function is called because channel->enablereading() is called, then channel->update() is called, then event_loop->updateChannel(), then ->epoll or poll's updateChannel is called
//
void EPollPoller::updateChannel(Channel* channel)
{<!-- -->
  Poller::assertInLoopThread(); //In the IO thread
  const int index = channel->index(); //The initial state index is -1
  LOG_INFO << "fd = " << channel->fd()
    << " events = " << channel->events() << " index = " << index;
 
  // When it is new or it was monitored before and then the monitoring was removed.
  //The difference between the two is that the new channel has not been saved in epoll before
  // del was previously saved in channels_, but was not placed in epoll_ctl for monitoring
  if (index == kNew || index == kDeleted) //index is the subscript in poll, and there are three states in epoll. There are three constants above
  {<!-- -->
    // a new one, add with EPOLL_CTL_ADD
    int fd = channel->fd();
    if (index == kNew)
    {<!-- -->
      assert(channels_.find(fd) == channels_.end()); //channels_ is a Map
      channels_[fd] = channel;
    }
    else // index == kDeleted
    {<!-- -->
      assert(channels_.find(fd) != channels_.end());
      assert(channels_[fd] == channel);
    }
 
    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel); //Register event
  }
  else
  {<!-- -->
    // update existing one with EPOLL_CTL_MOD/DEL
    int fd = channel->fd();
    (void)fd;
    assert(channels_.find(fd) != channels_.end());
    assert(channels_[fd] == channel);
    assert(index == kAdded);
 
// Now that it has been added, the possible modification is to modify the listening time, or stop listening.
    // So here we first determine whether it is an unmonitored event. If so, remove it directly.
    if (channel->isNoneEvent()) //Judge that there is no event
    {<!-- -->
      update(EPOLL_CTL_DEL, channel); //Delete event
      channel->set_index(kDeleted); //set to kDeleted after deletion
    }
    else
    {<!-- -->
      update(EPOLL_CTL_MOD, channel); //Modify registered listening events
    }
  }
}
 
void EPollPoller::removeChannel(Channel* channel)
{<!-- -->
  Poller::assertInLoopThread(); //Determine whether it is in the IO thread
  int fd = channel->fd();
  LOG_TRACE << "fd = " << fd;
  assert(channels_.find(fd) != channels_.end());
  assert(channels_[fd] == channel);
  assert(channel->isNoneEvent());
  int index = channel->index();
  assert(index == kAdded || index == kDeleted);
  size_t n = channels_.erase(fd); //Delete
  (void)n;
  assert(n == 1);
 
  if (index == kAdded)
  {<!-- -->
    update(EPOLL_CTL_DEL, channel);
  }
  channel->set_index(kNew);
}
 
void EPollPoller::update(int operation, Channel* channel)
{<!-- -->
  printf("-------%s,line.%d-------\
",__FUNCTION__,__LINE__);
  struct epoll_event event; //Structure to store data
  memZero( & amp;event, sizeof event);
  event.events = channel->events(); //Registered events
  event.data.ptr = channel;
  int fd = channel->fd();
  LOG_INFO << "epoll_ctl op = " << operationToString(operation)
    << " fd = " << fd << " event = { " << channel->eventsToString() << " }";
  if (::epoll_ctl(epollfd_, operation, fd, & amp;event) < 0)//epoll_ctl returns -1 if it fails
  {<!-- -->
    if (operation == EPOLL_CTL_DEL)
    {<!-- -->
      LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
    }
    else
    {<!-- -->
      LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
    }
  }
}
 
const char* EPollPoller::operationToString(int op)
{<!-- -->
  switch(op)
  {<!-- -->
    case EPOLL_CTL_ADD:
      return "ADD";
    case EPOLL_CTL_DEL:
      return "DEL";
    case EPOLL_CTL_MOD:
      return "MOD";
    default:
      assert(false & amp; & amp; "ERROR op");
      return "Unknown Operation";
  }
}