Echo server based on multi-threaded Reactor mode EchoServer

record

A thread is dedicated to accepting accept and obtaining the client’s fd.

After obtaining fd, find a thread with the smallest number of connected clients from the remaining execution threads.

Then add the client’s fd to this thread and monitor this fd through EPOLL

Threads communicate through eventfd, passing the client’s fd to the corresponding thread.

Referring to MediaServer, the concepts of EventPollerPoll and EventPoller are introduced

At least two two threads. If it is set to 1, it will be changed to 2.

cpp code:

#include "durian.h"

#include <sys/epoll.h>

namespace DURIAN
{

EventPoller::EventPoller(int id)
{
m_id = id;
}

EventPoller::~EventPoller()
{


printf("~EventPoller signal m_id = %d m_run_flag = %d\\
",m_id,m_run_flag);

\t\t
Wait();

\t\t
}
    bool EventPoller::Init()
    {
        m_poll_fd = epoll_create1(0);
        if(m_poll_fd == -1)
        {
            return false;
        }

m_event_fd = eventfd(0,0);
if(m_event_fd == -1)
{
printf("new fd failed\\
");
close(m_poll_fd);
return false ;
}

\t\t  
        return true;
    }

void EventPoller::RunLoop()
    {
static const int MAX_EVENTS = 1024;
struct epoll_event events[MAX_EVENTS];
\t\t
    while(m_run_flag)
    {

            int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);
            
            if(ready_count == -1)
            {
                if(errno != EINTR)
                {
                    //exit(1);
                }
                //ready_count = 0;
            }
            else if(ready_count == 0)
            {
                if(m_run_flag == false)
                {
                    //printf("time out and runflag = false exit thread\\
");
                    //break;
                }
            }

            for(int i = 0;i<ready_count;i + + )
            {
                const struct epoll_event & amp;ev = events[i];
                int fd = events[i].data.fd;
                
                if(ev.events & amp;(EPOLLIN | EPOLLERR |EPOLLHUP))
                {
                    auto handler = m_accept_handlers[fd];
                    handler(fd);
                }
                else if(ev.events & amp; (EPOLLOUT | EPOLLERR | EPOLLHUP))
                {

\t\t
                    auto it = m_buffer_pool.find(fd);
                    if(it!= m_buffer_pool.end())
                    {
                        auto & amp;buf = it->second;
                        if(buf.WriteData(fd) == false)
                        {
                            Close(fd);
                        }
                    }
                }
            }
        }
    }


int EventPoller::GetEventFD()
{
return m_event_fd;
}

int EventPoller::GetClients()
{
return m_accept_handlers.size();
}


void EventPoller::Stop()
{
m_run_flag = false;
}
void EventPoller::Start()
{
//printf("Enter EventPoller Start m_id = %d pollfd = %d eventid = %d\\
",m_id,m_poll_fd,m_event_fd);
m_run_flag = true;
m_thread_id = std::thread( & amp;EventPoller::RunLoop,this);

}

void EventPoller::Wait()
{
if(m_thread_id.joinable())
{
m_thread_id.join();
}
}
bool EventPoller::Add2Epoll(int fd)
{
if(m_accept_handlers.count(fd) != 0)
        {
            return false;
        }
        int flags = 1;
        if(ioctl(fd,FIONBIO, & amp;flags) == -1)
        {
            return false;
        }

        struct epoll_event ev;
        ev.events = EPOLLIN |EPOLLOUT |EPOLLET;
        ev.data.fd = fd;
        if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd, & amp;ev)==-1)
        {
            return false;
        }

        return true;
}


void EventPoller::DeliverConn(int conn_fd)
{
//printf("DeliverConn fd = %d\\
",conn_fd);
uint64_t count = conn_fd;

\t\t
if(write(m_event_fd, & amp;count,sizeof(count)) == -1)
{
printf("Deliverconn write failed\\
");
}
}

bool EventPoller::AddListener(int fd,ACCEPTER on_accept)
   {
if(Add2Epoll(fd) == false)
{
return false;
}

std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;

m_accept_handlers[fd] = [this,on_accept]( int server_fd){
for(;;)
{
int new_fd = accept(server_fd,nullptr,nullptr);
std::cout<<"accept client fd = "<<new_fd<<std::endl;
if(new_fd == -1)
{
if(errno!= EAGAIN)
{
Close(server_fd);
}
return 0;
}
\t\t
int enable = 1;
setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY, & amp;enable,sizeof(enable));
on_accept(new_fd);
\t\t\t\t
}
return 0;
};
return true;
   }



bool EventPoller::AddEventer(int fd, EVENTER on_event)
{
if(Add2Epoll(fd) == false)
{
return false;
}

m_accept_handlers[fd] = [this,on_event](int cfd){
            for(;;)
            {
                uint64_t count;
                if(read(cfd, & amp;count,sizeof(count)) == -1)
                {
                    if(errno != EAGAIN)
                    {
                        Close(cfd);
                    }
                    return 0;
                }
                on_event(count);
            }
return 0;
        };

        return true;
\t\t
}


bool EventPoller::AddReader(int fd, READER on_read)
{
if(Add2Epoll(fd) == false)
{
return false;
}


     m_accept_handlers[fd] = [this,on_read](int cfd){
         for(;;)
         {
             char buf[4096] = {0};
             ssize_t ret = read(cfd,buf,sizeof(buf));
             if(ret == -1)
             {
                 if(errno != EAGAIN)
                 {
                     Close(cfd);
                 }
                 return -1;
             }

             if(ret == 0)
             {
                 Close(cfd);
printf("The client closed the connection %d\\
",cfd);
                 return 0;
             }

             on_read(cfd,buf,ret);

         }
     };
return true;
}


void EventPoller::Close(int fd)
{
m_accept_handlers.erase(fd);
m_buffer_pool.erase(fd);
close(fd);
}

bool EventPoller::FlushData(int fd, const char * buf, size_t len)
{
WriteBuffer *wb = nullptr;
auto it = m_buffer_pool.find(fd);
if(it == m_buffer_pool.end())
{
while(len>0)
{
ssize_t ret = write(fd,buf,len);
if(ret == -1)
{
if(errno != EAGAIN)
{
Close(fd);
return false;
}
wb = &m_buffer_pool[fd];
break;
}
buf + = ret;
len-=ret;
}

if(len == 0)
{
//Success
return true;
}
}
else
{
wb = & amp;it->second;
}
wb->Add2Buffer(buf,len);
return true;
}


static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{
g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{
static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool());
static EventPollerPool &s_instance_ref = *s_instance;
return s_instance_ref;
}

EventPollerPool::EventPollerPool()
{
auto size = g_pool_size;
auto cpus = std::thread::hardware_concurrency();
size = size > 0 ? size : cpus;

std::cout<<"Thread size:"<<size<<std::endl;

if(size <2)size = 2;

\t
for (int i = 0; i < size; + + i) {

std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);
m_pollers.emplace_back(poller);
}
}

std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{
if(m_pollers.size()>1)
{
int min_clients = 10000;
int target_index = 0;
for(int i = 1;i<m_pollers.size();i + + )
{
if(m_pollers[i]-> GetClients() < min_clients)
{
min_clients = m_pollers[i]->GetClients();
target_index = i;
}
\t\t\t
}
\t\t
//printf("target index = %d min_clients = %d\\
",target_index,min_clients);
return m_pollers[target_index];
}
return m_pollers[0];
\t
}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{
return m_pollers[0];
}


void EventPollerPool::StartPollers()
{

for(int i = 1;i<m_pollers.size();i + + )
{

m_pollers[i]->Init();
int event_fd = m_pollers[i]->GetEventFD();

m_pollers[i]->AddEventer(event_fd,[ & amp;,i](uint64_t cfd){

READER reader = [ & amp;,i](int fd,const char*data,size_t len){
printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\\
",data,len,m_pollers[i],i);
m_pollers[i]->FlushData(fd,data,len);
\t\t\t\t\t\t
return 0;
};
m_pollers[i]->AddReader(cfd,reader);
return 0;
});

m_pollers[i]->Start();

}
}


void EventPollerPool::Stop()
{
for(int i = 0;i<m_pollers.size();i + + )
{
m_pollers[i]->Stop();
}

}

}

head File


#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>

#include <sys/eventfd.h>
#include <signal.h>


#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>

#include <unordered_map>

namespace DURIAN
{



class WriteBuffer
{
private:
std::list<std::string> buf_items;
size_t offset = 0;
\t
public:
bool IsEmpty() const
{
return buf_items.empty();
}
void Add2Buffer(const char* data,size_t len)
{
if(buf_items.empty() || buf_items.back().size() + len >4096)
{
buf_items.emplace_back(data,len);
}
else
{
buf_items.back().append(data,len);
}
}
bool WriteData(int fd)
{
while (IsEmpty() == false)
{
auto const & amp;item = buf_items.front();
const char *p = item.data() + offset;
size_t len = item.size() -offset;
\t
while(len>0)
{
ssize_t ret = write(fd,p,len);
if(ret == -1)
{
if(errno == EAGAIN)
{
return true;
}
return false;
}
offset + = ret;
p + =ret;
len-= ret;
}
\t
buf_items.pop_front();
}
\t
return true;
\t\t\t  
}
};


using ACCEPTER = std::function<int(int)>;
using WRITER = std::function<int(int)>;
using EVENTER = std::function<int(int)>;
using READER = std::function<int(int,const char *data,size_t)>;

//static thread_local std::unordered_map<int fd,READER>g_th_handlers;


class EventPoller{
private:
int m_poll_fd = -1;
int m_id;
bool m_run_flag = false;
std::unordered_map<int,ACCEPTER> m_accept_handlers;

std::unordered_map<int,WriteBuffer> m_buffer_pool;

std::mutex m_connction_lock;
int m_event_fd;
std::thread m_thread_id;
std::vector<int>m_connections;
\t\t\t
void RunLoop();

public:
EventPoller(int i);
~EventPoller();
int GetEventFD();
int GetClients();
std::vector<int> & amp; GetConnections();
bool Init();
void Start();
void Stop();
void Wait();
void DeliverConn(int conn_fd);
bool AddListener(int fd,ACCEPTER on_listen);
bool AddEventer(int fd,EVENTER on_event);
bool AddReader(int fd,READER on_read);
void Close(int fd);
bool Add2Epoll(int fd);
bool FlushData(int fd,const char *buf,size_t len);
\t\t\t


};

class EventPollerPool
{
public:
static EventPollerPool &Instance();
static void SetPoolSize(size_t size = 0);
std::shared_ptr<EventPoller>GetPoller();
std::shared_ptr<EventPoller>GetFirstPoller();
void StartPollers();
void Stop();
private:
int m_size;
std::vector<std::shared_ptr<EventPoller>> m_pollers;
EventPollerPool();
};


\t
}

main file

#include "durian.h"

static bool g_run_flag = true;
void sig_handler(int signo)
{
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
signal(SIGKILL, SIG_IGN);

g_run_flag = false;
printf("Get exit flag\\
");

if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo)
{
        g_run_flag = false;

printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\\
");
\t\t

}

}




bool StartServer()
{
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
printf("Create socket failed\\
");
return false;
}
else
{
printf("Server listen fd is:%d\\
",listen_fd);
}

int reuseaddr = 1;
if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR, & amp;reuseaddr ,sizeof(reuseaddr)) == -1)
{
return false;
}

struct sockaddr_in listen_addr = {0};
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(8888);

if(bind(listen_fd,(struct sockaddr*) & amp;listen_addr,sizeof(listen_addr)) == -1)
{
printf("bind failed\\
");
return false;
}

if(listen(listen_fd,100) == -1)
{
printf("listen failed\\
");
return false;
}


DURIAN::EventPollerPool::SetPoolSize(1);

\t 
DURIAN::EventPollerPool & amp; pool = DURIAN::EventPollerPool::Instance();


pool.StartPollers();


auto poller = pool.GetFirstPoller();
\t 
if(poller->Init())
{
if(poller->AddListener(listen_fd,[ & amp;](int conn_fd){
printf("Add new fd to epoll monitoring fd =%d\\
",conn_fd);
//Deliver client fd to other pollers
\t\t\t
pool.GetPoller()->DeliverConn(conn_fd);
return 0;

}) == false)
{
return false;
}
poller->Start();

}



while(g_run_flag)
{
sleep(2);
}

pool.Stop();

}




void StopServer()
{
DURIAN::EventPollerPool & amp; pool = DURIAN::EventPollerPool::Instance();
pool.Stop();
}

int main(int argc,char *argv[])
{
printf(" cpp version :%d\\
",__cplusplus);


int thread_size = 1;

bool run_flag = true;

signal(SIGPIPE,SIG_IGN);

signal(SIGTERM, sig_handler);
signal(SIGKILL, sig_handler);
signal(SIGINT,sig_handler);

\t
StartServer();
\t
return 0;
}

Performance Testing

ulimit -HSn 102400

ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html