Server IO multiplexing reactor mode

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <functional>
#include <future>
#include <algorithm>

const int MAX_CLIENTS = 10;
const int BUFFER_SIZE = 1024;
const int MAX_THREADS = 4;

structEventData
{<!-- -->
    int clientSocket;
};

class ThreadPool
{<!-- -->
public:
    ThreadPool(size_t numThreads)
    {<!-- -->
        for (size_t i = 0; i < numThreads; + + i)
        {<!-- -->
            threads_.emplace_back([this]
                                  {<!-- -->
                while (true) {<!-- -->
                    std::function<void()> task;
                    {<!-- -->
                        std::unique_lock<std::mutex> lock(mutex_);
                        condition_.wait(lock, [this] {<!-- --> return stop_ || !tasks_.empty(); });

                        if (stop_ & amp; & amp; tasks_.empty()) {<!-- -->
                            return;
                        }

                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    //std::cout << "task(); " << std::endl;
                    task();
                } });
        }
    }

    ~ThreadPool()
    {<!-- -->
        {<!-- -->
            std::unique_lock<std::mutex> lock(mutex_);
            stop_ = true;
        }
        condition_.notify_all();

        for (std::thread & amp;thread : threads_)
        {<!-- -->
            thread.join();
        }
    }

    void Enqueue(std::function<void()> func)
    {<!-- -->
        {<!-- -->
            std::unique_lock<std::mutex> lock(mutex_);
            tasks_.emplace(std::move(func));
        }
        condition_.notify_one();
    }

private:
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;

    std::mutex mutex_;
    std::condition_variable condition_;
    bool stop_ = false;
};

classReactor
{<!-- -->
public:
    Reactor(ThreadPool & amp;threadPool) : threadPool_(threadPool)
    {<!-- -->
    }

    void Register(int clientSocket)
    {<!-- -->
        std::cout << "Register " << std::endl;
        //std::lock_guard<std::mutex> lock(mutex_);
        std::cout << "Register2 " << std::endl;
        clientSockets_.push_back(clientSocket);
        std::cout << "Socket " << clientSocket << " registered." << std::endl;
    }

    void Remove(int clientSocket)
    {<!-- -->
        std::lock_guard<std::mutex> lock(mutex_);
        clientSockets_.erase(std::remove_if(clientSockets_.begin(), clientSockets_.end(),
                                            [clientSocket](int socket)
                                            {<!-- --> return socket == clientSocket; }),
                             clientSockets_.end());
        close(clientSocket);
        FD_CLR(clientSocket, &readFds);
        std::cout << "Socket " << clientSocket << " removed." << std::endl;
        clientSocket = 0;
    }

    void Run(int mainSocket)
    {<!-- -->
        int maxFd = mainSocket;

        while(true)
        {<!-- -->
            FD_ZERO( & amp;readFds);
            FD_SET(mainSocket, &readFds);

            {<!-- -->
                std::unique_lock<std::mutex> lock(mutex_);

                for (int socket : clientSockets_)
                {<!-- -->
                    if (socket > 0)
                    {<!-- -->
                        FD_SET(socket, &readFds);
                        maxFd = std::max(maxFd, socket);
                    }
                }

                struct timeval timeout;
                timeout.tv_sec = 0;
                timeout.tv_usec = 0.01;

                int selectResult = select(maxFd + 1, & amp;readFds, nullptr, nullptr, & amp;timeout);
                if (selectResult == -1)
                {<!-- -->
                    perror("select");
                    return;
                }
                else if (selectResult == 0)
                {<!-- -->
                    // No ready socket, continue event loop
                    continue;
                }

                if (FD_ISSET(mainSocket, &readFds))
                {<!-- -->
                    //There is a new connection request
                    struct sockaddr_in clientAddress;
                    socklen_t clientAddressLength = sizeof(clientAddress);
                    int clientSocket = accept(mainSocket, (struct sockaddr *) & amp;clientAddress, & amp;clientAddressLength);

                    if (clientSocket == -1)
                    {<!-- -->
                        if (errno == EWOULDBLOCK)
                        {<!-- -->
                            // No new connections, continue event loop
                            continue;
                        }
                        else
                        {<!-- -->
                            perror("accept");
                            break;
                        }
                    }
                    else
                    {<!-- -->
                        std::cout << "Accepted new connection." << std::endl;

                        //Add the client socket to the client sockets array

                        Register(clientSocket);
                        std::cout << "Register1 " << std::endl;
                    }
                }

                // for (int i = 0; i < MAX_CLIENTS; + + i)
                // {<!-- -->
                // if (FD_ISSET(clientSockets[i], & amp;readFds))
                // {<!-- -->
                // readySockets.push_back(clientSockets[i]);
                // }
                // }
            }

            for (int socket : clientSockets_)
            {<!-- -->
                if (FD_ISSET(socket, &readFds))
                {<!-- -->
                    threadPool_.Enqueue([this, socket]()
                                        {<!-- -->
                    EventData eventData;
                    eventData.clientSocket = socket;
                    ProcessEvent(eventData); });
                }
            }
        }
    }

private:
    void ProcessEvent(EventData eventData)
    {<!-- -->
        // Handle events, here is just a simple example, return client data
        char buffer[BUFFER_SIZE];
        memset(buffer, 0, sizeof(BUFFER_SIZE));
        ssize_t bytesRead = recv(eventData.clientSocket, buffer, BUFFER_SIZE, 0);
        if (bytesRead > 0)
        {<!-- -->
            send(eventData.clientSocket, buffer, bytesRead, 0);
        }
        else if (bytesRead == 0 || (bytesRead == -1 & amp; & errno != EWOULDBLOCK))
        {<!-- -->
            // The connection is closed or an error occurred, remove the client
            Remove(eventData.clientSocket);
        }
        //Notification event has been processed
        std::cout << "Processed socket " << eventData.clientSocket << " event data " << buffer << "in Thread: " << std::this_thread::get_id() << std::endl;
    }

private:
    ThreadPool &threadPool_;
    std::mutex mutex_;
    std::vector<int> clientSockets_;
    fd_set readFds;
};

class Server
{<!-- -->
public:
    Server(ThreadPool & amp;threadPool) : reactor_(threadPool)
    {<!-- -->
    }

    bool Init(int port)
    {<!-- -->
        mainSocket_ = socket(AF_INET, SOCK_STREAM, 0);
        if (mainSocket_ == -1)
        {<!-- -->
            std::cerr << "creat socket err" << std::endl;
            return false;
        }

        int opt = 1;
        if (setsockopt(mainSocket_, SOL_SOCKET, SO_REUSEADDR, & amp;opt, sizeof(opt)) == -1)
        {<!-- -->
            std::cerr << "setsockopt err" << std::endl;
            close(mainSocket_);
            return false;
        }

        int flags = fcntl(mainSocket_, F_GETFL, 0);
        if (flags == -1)
        {<!-- -->
            std::cerr << "Error getting socket flags." << std::endl;
            close(mainSocket_);
            return false;
        }
        if (fcntl(mainSocket_, F_SETFL, flags | O_NONBLOCK) == -1)
        {<!-- -->
            std::cerr << "Error setting socket to non-blocking mode." << std::endl;
            close(mainSocket_);
            return false;
        }

        struct sockaddr_in serverAddress;
        serverAddress.sin_family = AF_INET;
        serverAddress.sin_addr.s_addr = INADDR_ANY;
        serverAddress.sin_port = htons(port);

        if (bind(mainSocket_, (struct sockaddr *) & amp;serverAddress, sizeof(serverAddress)) == -1)
        {<!-- -->
            perror("bind");
            return false;
        }

        if (listen(mainSocket_, MAX_CLIENTS) == -1)
        {<!-- -->
            perror("listen");
            return false;
        }

        port_ = port;

        std::cout << "server init ok, listening on port: " << port_ << "." << std::endl;

        return true;
    }

    void Run()
    {<!-- -->
        std::thread reactorThread([ & amp;]()
                                  {<!-- -->reactor_.Run(mainSocket_); });

        // Wait for the Reactor thread to end
        reactorThread.join();
    }

private:
    int mainSocket_;
    int port_;
    Reactor reactor_;
};

int main(int argc, char *argv[])
{<!-- -->
    if (argc < 2)
    {<!-- -->
        std::cerr << "please input the port of server." << std::endl;
        return -1;
    }

    int port = atoi(argv[1]);
    ThreadPool threadPool(MAX_THREADS); // Create a thread pool
    Server server(threadPool); // Pass the thread pool to the server constructor

    if (!server.Init(port))
    {<!-- -->
        std::cerr << "int server failed :" << port << std::endl;
        return 1;
    }

    server.Run();

    return 0;
}

  • Debugging: nc command as client under Linux:
    nc 127.0.0.1 7777