IOCP model under windows

1 iocp echo server

-------------------------------------ThreadPool.h------- ---------------------------------
//The thread pool is the basis of the iocp echo server
#pragma once
#include <atomic>
#include <process.h>
#include <vector>
#include <mutex>
#include <WinSock2.h>
#include <atlstr.h>
#include <iostream>
typedef int (*FUNCTYPE)(void*);
class ThreadWorker {
public:
ThreadWorker() : func(NULL), arg(arg) {};
ThreadWorker(FUNCTYPE f, void* arg = NULL) : func(f), arg(arg) {}
ThreadWorker(const ThreadWorker & worker) {
func = worker. func;
arg = worker. arg;
}
ThreadWorker & amp; operator=(const ThreadWorker & amp; worker) {
if (this != & worker) {
func = worker. func;
arg = worker. arg;
}
return *this;
}

int operator()(void* arg) {
if (IsValid()) {
return func(arg);
}
return -1;
}
bool IsValid() const {
return (func != NULL);
}

public:
FUNCTYPE func;
void* arg;
};


class MyThread
{
public:
MyThread() {
m_hThread = NULL;
m_bStatus = false;
}
~MyThread() {

}
/*
* Judging the thread status:
* Return true to indicate that it is valid, return false to indicate that the thread is abnormal or terminated
*/
bool IsValid() {
if (m_hThread == NULL || (m_hThread == INVALID_HANDLE_VALUE)) return false;
return WaitForSingleObject(m_hThread, 0) == WAIT_TIMEOUT;
}
/*
* Start thread:
* Return true for success, return false for failure
*/
bool Start() {
m_bStatus = true;
m_hThread = (HANDLE)_beginthread( & amp;MyThread::ThreadEntry, 0, this);
if (!IsValid()) {
m_bStatus = false;
}
return m_bStatus;
}
/*
* Close the thread:
* Return true for success, return false for failure
*/
bool Stop() {
if (m_bStatus == false) return true;
m_bStatus = false;
bool ret = WaitForSingleObject(m_hThread, INFINITE) == WAIT_OBJECT_0;
UpdateWorker();
return ret;
}
void UpdateWorker(const ::ThreadWorker & worker = ::ThreadWorker()) {
if (m_worker.load() != NULL & amp; & amp; m_worker.load() != & amp;worker) {
::ThreadWorker* pWorker = m_worker.load();
m_worker.store(NULL);
delete pWorker;
}
if (!worker. IsValid()) {
m_worker.store(NULL);
return;
}
m_worker.store(new ::ThreadWorker(worker));
}
/*
* Detect whether the current thread is assigned a task:
* true: allocated; false: not allocated
*/
bool IsIdle() {
if (m_worker == NULL) return true;
return !m_worker.load()->IsValid();
}

private:
/*
* Open thread:
* Call the system API to start a system thread
*/
static void ThreadEntry(void* arg) {
MyThread* thiz = (MyThread*)arg;
if (thiz) {
thiz->ThreadWorker();
}
_endthread();
}
/*
* Execution thread:
* If m_worker has not been assigned a task, it will keep looping and will not execute
* Function will be executed once task is assigned
*/
void ThreadWorker() {
while (m_bStatus) {
if (m_worker == NULL) {
Sleep(1);
continue;
}
::ThreadWorker worker = *m_worker.load();
if (worker. IsValid()) {
int ret = worker(worker.arg);
if (ret == -1) {
std::cout << "thread found warning" << std::endl;
}
else {
m_worker.store(NULL);
}
}
else {
Sleep(1);
}
}
}
private:
HANDLE m_hThread;//thread handle
bool m_bStatus;//The current state of the thread
std::atomic<::ThreadWorker*> m_worker;//store a thread class, mainly including functions

};

class MyThreadPool
{
public:
MyThreadPool(size_t size) {
m_threads.resize(size);
for (size_t i = 0; i < size; i ++ )
m_threads[i] = new MyThread();
}
MyThreadPool() {}
~MyThreadPool() {
Stop();
for (size_t i = 0; i < m_threads. size(); i ++ )
{
MyThread* pThread = m_threads[i];
m_threads[i] = NULL;
delete pThread;
}

m_threads. clear();
}
//Activate the thread pool
bool Invoke() {
bool ret = true;
for (size_t i = 0; i < m_threads. size(); i ++ ) {
if (m_threads[i]->Start() == false) {
ret = false;
break;
}
}
if (ret == false) {
for (size_t i = 0; i < m_threads. size(); i ++ ) {
m_threads[i]->Stop();
}
}
return ret;
}
void Stop() {
for (size_t i = 0; i < m_threads. size(); i ++ ) {
m_threads[i]->Stop();
}
}

//Return -1 means that the allocation failed, all threads are busy and greater than or equal to 0, indicating that the nth thread is allocated to do this
int DispatchWorker(const ThreadWorker & amp; worker) {
int index = -1;
m_lock. lock();
for (size_t i = 0; i < m_threads. size(); i ++ ) {
if (m_threads[i] != NULL & amp; & amp; m_threads[i]->IsIdle()) {
m_threads[i]->UpdateWorker(worker);
index = i;
break;
}
}
m_lock. unlock();
return index;
}

bool CheckThreadValid(size_t index) {
if (index < m_threads. size()) {
return m_threads[index]->IsValid();
}
return false;
}
public:
std::mutex m_lock;
std::vector<MyThread*> m_threads;
};
----------------------------------------iocpServer.cpp------- ----------------------------
#include "ThreadPool.h"
#include <WinSock2.h>
#include <iostream>
#include <map>
#pragma comment(lib, "ws2_32.lib")
using namespace std;
MyThreadPool pool(10);
#define BUF_SIZE 100
#define READ 1
#define WRITE 2
class ClientMessage { //Client information
public:
SOCKET hclientSocket;
SOCKADDR_IN clientAddr;
};

class MyOverlapped {
public:
MyOverlapped() {
memset( &m_overlapped, 0, sizeof(m_overlapped));
m_wsabuffer.len = BUF_SIZE;
m_wsabuffer.buf = buffer;
}
public:
OVERLAPPED m_overlapped;
int m_operator;//operation 1 read; 2 write
char buffer[BUF_SIZE];//buffer
ThreadWorker m_worker;//processing function
ClientMessage client;
WSABUF m_wsabuffer;
};
int ReadyReadThread(void* arg) {
DWORD flags = 0;
MyOverlapped* overlap = (MyOverlapped*)arg;
memset( & amp;(overlap->m_overlapped), 0, sizeof(OVERLAPPED));
overlap->m_operator = WRITE;
//client socket buffer array address array length the address to save the actual number of bytes variable data transmission characteristics time status Routine function
WSASend(overlap->client.hclientSocket, & amp;(overlap->m_wsabuffer), 1, NULL, 0, & amp;(overlap->m_overlapped), NULL);
cout << "Data sent" << endl;
return 0;
}
int ReadyWriteThread(void* arg) {
DWORD flags = 0;
MyOverlapped* lpoverlap = (MyOverlapped*)arg;
MyOverlapped* overlap = new MyOverlapped();
overlap->m_operator = READ;
overlap->client = lpoverlap->client;
WSARecv(overlap->client.hclientSocket, & amp;(overlap->m_wsabuffer), 1, NULL, & amp;flags, & amp;(overlap->m_overlapped), NULL);
cout << "Continue to receive data" << endl;
delete lpoverlap;
return 0;
}
unsigned WINAPI EchoThreadMain(LPVOID pComPort)
{
cout << "Thread NO." << GetCurrentThreadId() << endl;
cout << "EchoThreadMain() initialization" << endl;
HANDLE hComPort = (HANDLE)pComPort;
SOCKET sock;
DWORD bytesTrans = 0;
ClientMessage* handleInfo;
MyOverlapped* overlap;
DWORD flags = 0;
while (1) {
//Get the client information of IO completion in the operation queue
GetQueuedCompletionStatus(hComPort, & amp;bytesTrans, (PULONG_PTR) & amp;handleInfo, (LPOVERLAPPED*) & amp;overlap, INFINITE);
if (bytesTrans != 0) {
sock = handleInfo->hclientSocket;
if (overlap->m_operator == READ) { //Determine whether the client is reading or writing
cout << "ready to read!" << endl;
cout << overlap->buffer << endl;
overlap->m_wsabuffer.len = bytesTrans;
while (1) {
if (pool. DispatchWorker(ThreadWorker( & amp; ReadyReadThread, overlap)) != -1) {
break;
}Sleep(100);
}
}
else {
cout << "Ready to write!" << endl;
while (1) {
if (pool. DispatchWorker(ThreadWorker( & ReadyWriteThread, overlap)) != -1) {
break;
}Sleep(100);
}
}
}
}
return 0;
}

int main() {
pool. Invoke();
WSADATA wsadata;
HANDLE hComPort; //Complete port handle
MyOverlapped* overlap; //ioMessage
ClientMessage* handleInfo;//key
SOCKET serverSocket;
SOCKADDR_IN serverAddr;
DWORD recvBytes, flags = 0;
//1. Initialize the socket
if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0)
cout << "WSAStartup() error" << endl;
//2. Create an iocp object (the second parameter is 0, so it is a newly created iocp), and the last parameter is 0 to create a thread with the same number of CPU cores. Returning NULL means the creation fails
hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (hComPort == NULL) return 0;
for (int i = 0; i < 3; i ++ ) {
_beginthreadex(NULL, 0, EchoThreadMain, (LPVOID)hComPort, 0, NULL);
}

//3. Create the listening socket of the server
serverSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (serverSocket == INVALID_SOCKET)
cout << "socket() error" << endl;

memset( & serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
serverAddr.sin_port = htons(8000);

if (bind(serverSocket, (SOCKADDR*) & serverAddr, sizeof(serverAddr)) == SOCKET_ERROR)
cout << "bind () error" << endl;

listen(serverSocket, 5);
cout << "The server started successfully!" << endl;
//4. Start listening
while (1) {
SOCKET clientSocket;
SOCKADDR_IN clientAddr;
int addrLen = sizeof(clientAddr);
cout << "wait connect!" << endl;
clientSocket = accept(serverSocket, (SOCKADDR*) &clientAddr, &addrLen);
if (clientSocket != INVALID_SOCKET) cout << "There is already a client connection:" << clientSocket << endl;
//(1) Add key->handleInfo for the socket
handleInfo = new ClientMessage();
handleInfo->hclientSocket = clientSocket;
memcpy( & amp;(handleInfo->clientAddr), & amp;clientAddr, addrLen);
//(2) Join the completion port
CreateIoCompletionPort((HANDLE)clientSocket, hComPort, (DWORD)handleInfo, 0);//bind a client socket to the completion port
//(3) Prepare to receive
overlap = new MyOverlapped(); //Create a handle for binding information such as event objects
overlap->m_operator = READ;
overlap->client = *handleInfo;
WSARecv(handleInfo->hclientSocket, & amp;(overlap->m_wsabuffer), 1, & amp;recvBytes, & amp;flags, & amp;(overlap->m_overlapped), NULL);//Asynchronously waiting to receive client information
//Parameters: 1 Overlapped IO socket 2 Save the structure array of received information 3 The length of the second parameter array 4 Save the variable address of the received message size 5 Set or read the message of the transmission characteristic
// 6 The state of the event object 7 The address of the Routine function
}
}

2 iocp server creation steps

Note: (only the socket that communicates with the client is mapped, and the socket that the server listens to is not considered for the time being, so the socket is either ready for reading or writing)

1. Create a completion port

2. Start one (multiple) threads to obtain the completion status of the queue, that is, determine which sockets in the completion port are ready for reading or writing (open additional threads to handle read and write operations)

3. The server creates a listening socket to detect the client connection

4. After detecting the client connection, add its socket to the completion port

Why does iocp use thread pool?

If the server opens a thread to serve it every time a user is connected, it will greatly increase the load on the server. The iocp will only open additional threads when the socket really needs to be operated, and the thread pool can control the maximum number of open threads on the server.