C++ implements socket IOCP

In the past few days, I have been studying multi-threading and network socket models. I wrote these and recorded them based on Teacher Renzhai’s C++ tutorial and the API reference provided by Windows as well as some information on the Internet.

Server side code.

#define _CRT_SECURE_NO_WARNINGS
#include <WinSock2.h>
#include <list>
#include <process.h>
#include <iostream>
#pragma comment(lib,"ws2_32.lib")
using namespace std;

struct IoData
{
IoData()
{
ZeroMemory(this, sizeof(IoData));
}

OVERLAPPED OverL;
CHAR buffer[1024];
byte Type;
DWORD Len;
WSABUF wasbuffer;
};

classFClient
{
public:
FClient(SOCKET S, SOCKADDR_IN InSin);
virtual ~FClient() {};

BOOL Recv();
BOOL Send();

public:
SOCKET ClientSocket;
SOCKADDR_IN Sin;
IoData Data;
};
FClient::FClient(SOCKET S, SOCKADDR_IN InSin)
:ClientSocket(S),
Sin(InSin)
{

}

BOOL FClient::Recv()
{
DWORD Flag = 0;
DWORD Len = 0;
Data.Type = 0;
Data.wasbuffer.buf = Data.buffer;
Data.wasbuffer.len = 1024;
//Reference https://learn.microsoft.com/zh-cn/windows/win32/api/winsock2/nf-winsock2-wsarecv
if (WSARecv(
ClientSocket,
&Data.wasbuffer,
1, & amp;Len, & amp;Flag, & amp;Data.OverL,NULL)== SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
return false;
}
}
return TRUE;
}

BOOL FClient::Send()
{
DWORD Flag = 0;
DWORD Len = 0;
Data.Type = 1;
Data.wasbuffer.buf = Data.buffer;
Data.wasbuffer.len = strlen(Data.buffer);
//Reference https://learn.microsoft.com/zh-cn/windows/win32/api/winsock2/nf-winsock2-wsasend
if (WSASend(
ClientSocket,
&Data.wasbuffer,
1, & amp;Len, & amp;Flag, (LPOVERLAPPED) & amp; Data.OverL, NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
return false;
}
}
return TRUE;
}

list<FClient*>ClientList;
void ListRemove(FClient* Client)
{
//Traverse the client linked list and discharge the matching process numbers.
for (auto iter = ClientList.begin(); iter != ClientList.end(); iter + + )
{
if (Client == (*iter))
{
ClientList.erase(iter);
break;
}
}
}

HANDLE cp = NULL; //Create completion port
unsigned int __stdcall Run(void* content)
{
for (;;)
{
DWORD IOSize = -1;
LPOVERLAPPED lpOverlapped = NULL;
FClient* Client = NULL;

//Queue task mode
bool Ret = GetQueuedCompletionStatus(
cp, & amp;IOSize, (LPDWORD) & amp;Client, & amp;lpOverlapped, INFINITE
);

if (Client == NULL & amp; & lpOverlapped == NULL)
{
break;
}

//If the queue is successful
if(Ret)
{
//Check the number of bytes accepted
if (IOSize == 0)
{
ListRemove(Client); //0 will directly remove the waiting queue
continue;
}

//https://blog.csdn.net/zyhse/article/details/109246875
//The purpose of CONTAINING_RECORD macro, construct data
IoData* InData = CONTAINING_RECORD(lpOverlapped, IoData, OverL);
switch (InData->Type)
{
case 0: //Accept information sent by the client
{
Client->Data.Len = IOSize;
Client->Data.buffer[IOSize] = '\0';
printf(Client->Data.buffer);

char buffer[1024] = { 0 }; //Test the message returned to the client
sprintf_s(buffer, 1024, "hi I'm Server (%d)", Client->ClientSocket);
strcpy(Client->Data.buffer, buffer);
Client->Send();
break;
}
case 1: //Send
{
printf(Client->Data.buffer);
Client->Data.Len = 0;
if (!Client->Recv())
{
ListRemove(Client);
}
break;
}
}
}
else //Failed to get failure information
{
DWORD Err_msg = GetLastError();
if (Err_msg == WAIT_TIMEOUT)
{
continue;
}
else if(lpOverlapped != NULL)
{
ListRemove(Client);
}
else
{
cout << Err_msg << endl;
break;
}
}



}
return 0;
}



int main()
{
SYSTEM_INFO systeminfo;
GetSystemInfo( & amp;systeminfo);

int CpuNums = systeminfo.dwNumberOfProcessors;
static const int Threadnums = CpuNums * 2;

HANDLE hThreadHandle[128]; //The return handle of the thread

if ((cp = CreateIoCompletionPort(
((HANDLE)(LONG_PTR)-1),
NULL,
0,
0
)) == NULL)
{
return GetLastError();
}

for (int i = 0; i <threadnums; i + + )
{
hThreadHandle[i] = (HANDLE)_beginthreadex(
NULL,//security attribute, when it is NULL, it means default security
0, // Thread stack size, generally defaults to 0
Run, // Thread function to be started
cp, // The parameter of the thread function is a void* type. Use a structure when passing multiple parameters.
0,//The initial state of the new thread, 0 means execution immediately, CREATE_SUSPENDED means suspension after creation
NULL); // Used to receive thread ID
}

//Start windows socket
WSADATA WsaData;
int err = WSAStartup(MAKEWORD(2, 2), & amp;WsaData);
if (err != 0)
{
return -1;
}
//Create Socket and listen to Socket
SOCKET Listen = INVALID_SOCKET;
if ((Listen = WSASocket(
AF_INET,//IPV4
SOCK_STREAM, /*
TCP a socket type,
It provides sequenced, reliable, bidirectional connection-based byte streams through the OOB data transfer mechanism.
This socket type uses the Internet address family (AF_INET or AF_INET6)
Transmission Control Protocol (TCP) SOCK_DGRAM is udp */
0, //Protocol
NULL, //Pointer to WSAPROTOCOL_INFO structure,
//This structure defines the characteristics of the socket to be created. If this parameter is not NULL,
//The socket will be bound to the provider associated with the indicated WSAPROTOCOL_INFO structure.
0,
WSA_FLAG_OVERLAPPED//
)) == INVALID_SOCKET)
{
WSACleanup();
return -1;
}
sockaddr_in Sin;
Sin.sin_family = AF_INET;
Sin.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
Sin.sin_port = htons(8888); //Port number

if (bind(Listen, (SOCKADDR*) & amp;Sin, sizeof(Sin)) == SOCKET_ERROR)
{
closesocket(Listen);
WSACleanup();
return -1;
}

if (listen(Listen, SOMAXCONN))
{
closesocket(Listen);
WSACleanup();
return -1;
}

//iocp delivery
SOCKET ClientAccpet = INVALID_SOCKET;
SOCKADDR_IN ClientAccpetAddr;
int ClientAccpetAddrLen = sizeof(ClientAccpetAddr);

for (;;)
{
//Block process
ClientAccpet = WSAAccept(Listen, (SOCKADDR*) &ClientAccpetAddr,
&ClientAccpetAddrLen, NULL, 0);

if (ClientAccpet == SOCKET_ERROR)
{
break;
}

FClient* InClient = new FClient(ClientAccpet, ClientAccpetAddr);
ClientList.push_back(InClient);

//Binding completion port
if (CreateIoCompletionPort(
(HANDLE)ClientAccpet,
cp,
(DWORD)InClient,
0
) == NULL)
{
break;
}

if (!InClient->Recv())
{
ListRemove(InClient);
}
}
\t
//Destroy the thread pool
for (int i = 0; i <threadnums; i + + )
{
PostQueuedCompletionStatus(cp, 0, NULL, NULL);
CloseHandle(hThreadHandle[i]);
}
WaitForMultipleObjects(Threadnums, hThreadHandle, TRUE, INFINITE);

return 0;
}

client

#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <iostream>
#include <WinSock2.h>
#pragma comment(lib,"ws2_32.lib")

int main()
{
//Start windows socket
WSADATA WsaData;
int Ret = 0;
if ((Ret = WSAStartup(MAKEWORD(2, 1), & amp;WsaData)) != 0)
{
return -1;
}

for (;;)
{
//Create socket
SOCKET ClientSocket = socket(
AF_INET,
SOCK_STREAM,
IPPROTO_TCP //tcp protocol IPPROTO_IP IP protocol
);

sockaddr_in Sin;
Sin.sin_family = AF_INET;
Sin.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
Sin.sin_port = htons(8888); //Port number

//Detect connection
if (
connect(
\t\t\t
ClientSocket,
\t\t\t
(SOCKADDR*) &Sin,
\t\t\t
sizeof(Sin)
) == SOCKET_ERROR)
{
//Close the socket if error occurs
closesocket(ClientSocket);
break;
}

//Send message
char buffer[1024] = { 0 };
sprintf_s(buffer, 1024, "Hello I'm client %d \
", ClientSocket);

send(ClientSocket, buffer, strlen(buffer), 0);

memset(buffer, 0, 1024);
recv(ClientSocket, buffer, sizeof(buffer), 0);//blocking

printf(buffer);

closesocket(ClientSocket);
}
WSACleanup();
return 0;

}

Debugging can see that after the server is started, each thread enters the queue task waiting by calling GetQueuedCompletionStatus().

The server can successfully receive the message sent by the client. And feedback the message to the client.

After the 32 threads have finished running, you can see the printed results on the console.