Windows C++ uses IOCP IO multiplexing to simultaneously monitor 1000 tcp socket messages

Windows C++ uses IOCP IO multiplexing to simultaneously monitor 1000 tcp socket messages

  1. Establish a TCP connection
BOOL CreateConnection(SOCKET socket, CString strServerIP, const CString strServerPort)
{<!-- -->
// create socket
socket = socket(AF_INET, SOCK_STREAM, 0);
if(socket == INVALID_SOCKET)
{<!-- -->
WriteError(_T("socket() called failed!"));
return -1;
}
unsigned long ul = 1;
\t
// Set socket options and other operations...
SOCKADDR_IN addrServer;
addrServer.sin_addr.S_un.S_addr = inet_addr(wchar2char(strServerIP.GetBuffer()));
addrServer.sin_family = AF_INET;
addrServer.sin_port = htons(_ttoi(strServerPort));
\t
int nRes=-1;

if (connect(socket, (sockaddr*) & addrServer, sizeof(addrServer)) == SOCKET_ERROR)
{<!-- -->
WriteError(_T("Failed to connect to server"));
closesocket(socket);
return nRes;
}

nRes=true;
return nRes;
}

where the socket() method is:
Parameter 1: domain (protocol domain): Specifies the address family used by the protocol. Common values are:
AF_INET: IPv4 address family
AF_INET6: IPv6 address family
AF_UNIX: Unix socket

Parameter two: type (socket type): specifies the type of socket, common values are:
SOCK_STREAM: Connection-oriented stream socket, using TCP protocol
SOCK_DGRAM: Connectionless datagram socket, using UDP protocol
SOCK_RAW: Raw socket, used for underlying network protocol access

Parameter 3: protocol (protocol): specifies the protocol used, common values are:
IPPROTO_TCP: TCP transport protocol
IPPROTO_UDP: UDP transport protocol

  1. Disconnect
BOOL CSendInfoToServer::CloseConnection(SOCKET socket)
{<!-- -->

if (socket != INVALID_SOCKET)
{<!-- -->
closesocket(socket);
socket = INVALID_SOCKET;
}
return TRUE;
}
  1. Create the receiving object class
struct client
{<!-- -->
SOCKET m_Socket;

//Used for IOCP to receive heartbeat messages
WSABUF dataBuffer;
OVERLAPPED overlapped;
char receivedData[4096]; //This space must be large enough to store received data once
}
  1. The processing thread of the message that received the data
// task function callback
static VOID CALLBACK TaskFunction(PTP_CALLBACK_INSTANCE instance, LPVOID lpParam, PTP_WORK work)
{<!-- -->
client* pCurClient = static_cast<client*>(lpParam);
    //Where pCurClient->receivedData; is the received data
    string strData = pCurClient->receivedData;
    std::cout<<strData;

// Submit the next overlapping IO request
// create buffer
ZeroMemory( & amp;(pCurClient->overlapped), sizeof(OVERLAPPED));
pCurClient->dataBuffer.len = 4096;
pCurClient->dataBuffer.buf = &(pCurClient->receivedData[0]);
DWORD flags = 0;
if (WSARecv(pCurClient->m_Socket, &(pCurClient->dataBuffer), 1, NULL, &flags, &(pCurClient->overlapped), NULL) == SOCKET_ERROR)
{<!-- -->
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{<!-- -->
WriteError(_T("Failed to submit overlapped IO request"));
closesocket(pCurClient->m_tSocket);
return;
}
}
return;
}

5.iocp listens to the thread, receives the message and throws it to the thread pool

//Used to receive the incoming data of TCP heartbeat
unsigned int RecvTCPFunc()
{<!-- -->
// create IOCP object
g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

//Create a thread pool,
g_threadPool = CreateThreadpool(NULL);
if (g_threadPool == NULL)
{<!-- -->
MessageBox(0, _T("The receiving thread pool failed to start, restart the application and try again"), _T("Error"), 0);
return 1;
}
\t
// Set the minimum and maximum number of threads in the thread pool
SetThreadpoolThreadMinimum(g_threadPool, 1);
SetThreadpoolThreadMaximum(g_threadPool, 1000);

    if (g_iocp == NULL)
{<!-- -->
        WriteError(_T("CreateIoCompletionPort error: Failed to create IOCP."));
        return 1;
    }

DWORD bytesTransferred = 0;
    ULONG_PTR completionKey = 0;
    LPOVERLAPPED overlapped = NULL;

while (TRUE)
{<!-- -->
// wait for IO to complete
        if (GetQueuedCompletionStatus(g_iocp, &bytesTransferred, &completionKey, &overlapped, INFINITE) == 0)
{<!-- -->
int iEr = GetLastError();
            WriteError(_T("GetQueuedCompletionStatus error: Failed to get completion status.GetLstError=%d."), iEr);
            Sleep(5000);
continue;
        }

// check if the socket is readable
if (bytesTransferred <= 0)
{<!-- -->
continue;
}

// create thread pool worker object
client* pCurClient = (client*)completionKey;
PTP_WORK work = CreateThreadpoolWork(TaskFunction, pCurClient, NULL);
if (work == NULL)
{<!-- -->
WriteError(_T("Failed to create thread pool work!"));
continue;
}

// Submit the task to the thread pool
SubmitThreadpoolWork(work);

if (overlapped != NULL)
{<!-- -->
delete overlapped;
}
}
}
  1. main function
void main()
{<!-- -->
WSADATA g_wsaData;
HANDLE g_iocp = NULL;
PTP_POOL g_threadPool;
\t
// Initialize the Winsock library
int startupResult = WSAStartup(MAKEWORD(2, 2), &g_wsaData);
//FD_ZERO( &g_heartBeatReadSet);

// message receiving thread
AfxBeginThread((AFX_THREADPROC)RecvTCPFunc,NULL, THREAD_PRIORITY_TIME_CRITICAL, 0, 0, NULL);
\t
//Cycle to establish a TCP connection
for(int i=0; i<1000; i ++ )
{<!-- -->
client pClient = new client;
pClient->CreateConnection(pClient->m_Socket,_T("127.0.0.1"), _T("8888"));
\t
// Bind socket to IOCP
        if (CreateIoCompletionPort((HANDLE)pClient->m_Socket, g_iocp, (ULONG_PTR)pClient, 0) == NULL)
{<!-- -->
            WriteError(_T("CreateIoCompletionPort ERROR: Failed to bind socket to IOCP"));
        }

// create buffer
pClient->dataBuffer.buf = &(pClient->receivedData[0]);
pClient->dataBuffer.len = 4096;

// Submit overlapping IO requests
if (pClient->m_Socket == INVALID_SOCKET)
{<!-- -->
WriteError(_T("SOCKET ERROR"));
}
// receive server message
ZeroMemory( & amp;(pClient->overlapped), sizeof(OVERLAPPED));
DWORD flags = 0;
if (WSARecv(pClient->m_Socket,
&(pClient->dataBuffer),
\t\t\t\t\t1,
NULL,
&flags,
&(pClient->overlapped),
NULL) == SOCKET_ERROR)
{<!-- -->
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{<!-- -->
WriteError(_T("Failed to submit overlapped IO request: %d"), errorCode);
closesocket(sockets[i]);
/*CloseHandle(completionPort);
WSACleanup();*/
}
}
}