#include <iostream> #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <unistd.h> #include <cstring> #include <sys/socket.h> #include <netinet/in.h> #include <fcntl.h> #include <functional> #include <future> #include <algorithm> const int MAX_CLIENTS = 10; const int BUFFER_SIZE = 1024; const int MAX_THREADS = 4; structEventData {<!-- --> int clientSocket; }; class ThreadPool {<!-- --> public: ThreadPool(size_t numThreads) {<!-- --> for (size_t i = 0; i < numThreads; + + i) {<!-- --> threads_.emplace_back([this] {<!-- --> while (true) {<!-- --> std::function<void()> task; {<!-- --> std::unique_lock<std::mutex> lock(mutex_); condition_.wait(lock, [this] {<!-- --> return stop_ || !tasks_.empty(); }); if (stop_ & amp; & amp; tasks_.empty()) {<!-- --> return; } task = std::move(tasks_.front()); tasks_.pop(); } //std::cout << "task(); " << std::endl; task(); } }); } } ~ThreadPool() {<!-- --> {<!-- --> std::unique_lock<std::mutex> lock(mutex_); stop_ = true; } condition_.notify_all(); for (std::thread & amp;thread : threads_) {<!-- --> thread.join(); } } void Enqueue(std::function<void()> func) {<!-- --> {<!-- --> std::unique_lock<std::mutex> lock(mutex_); tasks_.emplace(std::move(func)); } condition_.notify_one(); } private: std::vector<std::thread> threads_; std::queue<std::function<void()>> tasks_; std::mutex mutex_; std::condition_variable condition_; bool stop_ = false; }; classReactor {<!-- --> public: Reactor(ThreadPool & amp;threadPool) : threadPool_(threadPool) {<!-- --> } void Register(int clientSocket) {<!-- --> std::cout << "Register " << std::endl; //std::lock_guard<std::mutex> lock(mutex_); std::cout << "Register2 " << std::endl; clientSockets_.push_back(clientSocket); std::cout << "Socket " << clientSocket << " registered." << std::endl; } void Remove(int clientSocket) {<!-- --> std::lock_guard<std::mutex> lock(mutex_); clientSockets_.erase(std::remove_if(clientSockets_.begin(), clientSockets_.end(), [clientSocket](int socket) {<!-- --> return socket == clientSocket; }), clientSockets_.end()); close(clientSocket); FD_CLR(clientSocket, &readFds); std::cout << "Socket " << clientSocket << " removed." << std::endl; clientSocket = 0; } void Run(int mainSocket) {<!-- --> int maxFd = mainSocket; while(true) {<!-- --> FD_ZERO( & amp;readFds); FD_SET(mainSocket, &readFds); {<!-- --> std::unique_lock<std::mutex> lock(mutex_); for (int socket : clientSockets_) {<!-- --> if (socket > 0) {<!-- --> FD_SET(socket, &readFds); maxFd = std::max(maxFd, socket); } } struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 0.01; int selectResult = select(maxFd + 1, & amp;readFds, nullptr, nullptr, & amp;timeout); if (selectResult == -1) {<!-- --> perror("select"); return; } else if (selectResult == 0) {<!-- --> // No ready socket, continue event loop continue; } if (FD_ISSET(mainSocket, &readFds)) {<!-- --> //There is a new connection request struct sockaddr_in clientAddress; socklen_t clientAddressLength = sizeof(clientAddress); int clientSocket = accept(mainSocket, (struct sockaddr *) & amp;clientAddress, & amp;clientAddressLength); if (clientSocket == -1) {<!-- --> if (errno == EWOULDBLOCK) {<!-- --> // No new connections, continue event loop continue; } else {<!-- --> perror("accept"); break; } } else {<!-- --> std::cout << "Accepted new connection." << std::endl; //Add the client socket to the client sockets array Register(clientSocket); std::cout << "Register1 " << std::endl; } } // for (int i = 0; i < MAX_CLIENTS; + + i) // {<!-- --> // if (FD_ISSET(clientSockets[i], & amp;readFds)) // {<!-- --> // readySockets.push_back(clientSockets[i]); // } // } } for (int socket : clientSockets_) {<!-- --> if (FD_ISSET(socket, &readFds)) {<!-- --> threadPool_.Enqueue([this, socket]() {<!-- --> EventData eventData; eventData.clientSocket = socket; ProcessEvent(eventData); }); } } } } private: void ProcessEvent(EventData eventData) {<!-- --> // Handle events, here is just a simple example, return client data char buffer[BUFFER_SIZE]; memset(buffer, 0, sizeof(BUFFER_SIZE)); ssize_t bytesRead = recv(eventData.clientSocket, buffer, BUFFER_SIZE, 0); if (bytesRead > 0) {<!-- --> send(eventData.clientSocket, buffer, bytesRead, 0); } else if (bytesRead == 0 || (bytesRead == -1 & amp; & errno != EWOULDBLOCK)) {<!-- --> // The connection is closed or an error occurred, remove the client Remove(eventData.clientSocket); } //Notification event has been processed std::cout << "Processed socket " << eventData.clientSocket << " event data " << buffer << "in Thread: " << std::this_thread::get_id() << std::endl; } private: ThreadPool &threadPool_; std::mutex mutex_; std::vector<int> clientSockets_; fd_set readFds; }; class Server {<!-- --> public: Server(ThreadPool & amp;threadPool) : reactor_(threadPool) {<!-- --> } bool Init(int port) {<!-- --> mainSocket_ = socket(AF_INET, SOCK_STREAM, 0); if (mainSocket_ == -1) {<!-- --> std::cerr << "creat socket err" << std::endl; return false; } int opt = 1; if (setsockopt(mainSocket_, SOL_SOCKET, SO_REUSEADDR, & amp;opt, sizeof(opt)) == -1) {<!-- --> std::cerr << "setsockopt err" << std::endl; close(mainSocket_); return false; } int flags = fcntl(mainSocket_, F_GETFL, 0); if (flags == -1) {<!-- --> std::cerr << "Error getting socket flags." << std::endl; close(mainSocket_); return false; } if (fcntl(mainSocket_, F_SETFL, flags | O_NONBLOCK) == -1) {<!-- --> std::cerr << "Error setting socket to non-blocking mode." << std::endl; close(mainSocket_); return false; } struct sockaddr_in serverAddress; serverAddress.sin_family = AF_INET; serverAddress.sin_addr.s_addr = INADDR_ANY; serverAddress.sin_port = htons(port); if (bind(mainSocket_, (struct sockaddr *) & amp;serverAddress, sizeof(serverAddress)) == -1) {<!-- --> perror("bind"); return false; } if (listen(mainSocket_, MAX_CLIENTS) == -1) {<!-- --> perror("listen"); return false; } port_ = port; std::cout << "server init ok, listening on port: " << port_ << "." << std::endl; return true; } void Run() {<!-- --> std::thread reactorThread([ & amp;]() {<!-- -->reactor_.Run(mainSocket_); }); // Wait for the Reactor thread to end reactorThread.join(); } private: int mainSocket_; int port_; Reactor reactor_; }; int main(int argc, char *argv[]) {<!-- --> if (argc < 2) {<!-- --> std::cerr << "please input the port of server." << std::endl; return -1; } int port = atoi(argv[1]); ThreadPool threadPool(MAX_THREADS); // Create a thread pool Server server(threadPool); // Pass the thread pool to the server constructor if (!server.Init(port)) {<!-- --> std::cerr << "int server failed :" << port << std::endl; return 1; } server.Run(); return 0; }
- Debugging: nc command as client under Linux:
nc 127.0.0.1 7777