Windows C++ uses IOCP IO multiplexing to simultaneously monitor 1000 tcp socket messages
- Establish a TCP connection
BOOL CreateConnection(SOCKET socket, CString strServerIP, const CString strServerPort) {<!-- --> // create socket socket = socket(AF_INET, SOCK_STREAM, 0); if(socket == INVALID_SOCKET) {<!-- --> WriteError(_T("socket() called failed!")); return -1; } unsigned long ul = 1; \t // Set socket options and other operations... SOCKADDR_IN addrServer; addrServer.sin_addr.S_un.S_addr = inet_addr(wchar2char(strServerIP.GetBuffer())); addrServer.sin_family = AF_INET; addrServer.sin_port = htons(_ttoi(strServerPort)); \t int nRes=-1; if (connect(socket, (sockaddr*) & addrServer, sizeof(addrServer)) == SOCKET_ERROR) {<!-- --> WriteError(_T("Failed to connect to server")); closesocket(socket); return nRes; } nRes=true; return nRes; }
where the socket() method is:
Parameter 1: domain (protocol domain): Specifies the address family used by the protocol. Common values are:
AF_INET: IPv4 address family
AF_INET6: IPv6 address family
AF_UNIX: Unix socket
Parameter two: type (socket type): specifies the type of socket, common values are:
SOCK_STREAM: Connection-oriented stream socket, using TCP protocol
SOCK_DGRAM: Connectionless datagram socket, using UDP protocol
SOCK_RAW: Raw socket, used for underlying network protocol access
Parameter 3: protocol (protocol): specifies the protocol used, common values are:
IPPROTO_TCP: TCP transport protocol
IPPROTO_UDP: UDP transport protocol
- Disconnect
BOOL CSendInfoToServer::CloseConnection(SOCKET socket) {<!-- --> if (socket != INVALID_SOCKET) {<!-- --> closesocket(socket); socket = INVALID_SOCKET; } return TRUE; }
- Create the receiving object class
struct client {<!-- --> SOCKET m_Socket; //Used for IOCP to receive heartbeat messages WSABUF dataBuffer; OVERLAPPED overlapped; char receivedData[4096]; //This space must be large enough to store received data once }
- The processing thread of the message that received the data
// task function callback static VOID CALLBACK TaskFunction(PTP_CALLBACK_INSTANCE instance, LPVOID lpParam, PTP_WORK work) {<!-- --> client* pCurClient = static_cast<client*>(lpParam); //Where pCurClient->receivedData; is the received data string strData = pCurClient->receivedData; std::cout<<strData; // Submit the next overlapping IO request // create buffer ZeroMemory( & amp;(pCurClient->overlapped), sizeof(OVERLAPPED)); pCurClient->dataBuffer.len = 4096; pCurClient->dataBuffer.buf = &(pCurClient->receivedData[0]); DWORD flags = 0; if (WSARecv(pCurClient->m_Socket, &(pCurClient->dataBuffer), 1, NULL, &flags, &(pCurClient->overlapped), NULL) == SOCKET_ERROR) {<!-- --> int errorCode = WSAGetLastError(); if (errorCode != WSA_IO_PENDING) {<!-- --> WriteError(_T("Failed to submit overlapped IO request")); closesocket(pCurClient->m_tSocket); return; } } return; }
5.iocp listens to the thread, receives the message and throws it to the thread pool
//Used to receive the incoming data of TCP heartbeat unsigned int RecvTCPFunc() {<!-- --> // create IOCP object g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); //Create a thread pool, g_threadPool = CreateThreadpool(NULL); if (g_threadPool == NULL) {<!-- --> MessageBox(0, _T("The receiving thread pool failed to start, restart the application and try again"), _T("Error"), 0); return 1; } \t // Set the minimum and maximum number of threads in the thread pool SetThreadpoolThreadMinimum(g_threadPool, 1); SetThreadpoolThreadMaximum(g_threadPool, 1000); if (g_iocp == NULL) {<!-- --> WriteError(_T("CreateIoCompletionPort error: Failed to create IOCP.")); return 1; } DWORD bytesTransferred = 0; ULONG_PTR completionKey = 0; LPOVERLAPPED overlapped = NULL; while (TRUE) {<!-- --> // wait for IO to complete if (GetQueuedCompletionStatus(g_iocp, &bytesTransferred, &completionKey, &overlapped, INFINITE) == 0) {<!-- --> int iEr = GetLastError(); WriteError(_T("GetQueuedCompletionStatus error: Failed to get completion status.GetLstError=%d."), iEr); Sleep(5000); continue; } // check if the socket is readable if (bytesTransferred <= 0) {<!-- --> continue; } // create thread pool worker object client* pCurClient = (client*)completionKey; PTP_WORK work = CreateThreadpoolWork(TaskFunction, pCurClient, NULL); if (work == NULL) {<!-- --> WriteError(_T("Failed to create thread pool work!")); continue; } // Submit the task to the thread pool SubmitThreadpoolWork(work); if (overlapped != NULL) {<!-- --> delete overlapped; } } }
- main function
void main() {<!-- --> WSADATA g_wsaData; HANDLE g_iocp = NULL; PTP_POOL g_threadPool; \t // Initialize the Winsock library int startupResult = WSAStartup(MAKEWORD(2, 2), &g_wsaData); //FD_ZERO( &g_heartBeatReadSet); // message receiving thread AfxBeginThread((AFX_THREADPROC)RecvTCPFunc,NULL, THREAD_PRIORITY_TIME_CRITICAL, 0, 0, NULL); \t //Cycle to establish a TCP connection for(int i=0; i<1000; i ++ ) {<!-- --> client pClient = new client; pClient->CreateConnection(pClient->m_Socket,_T("127.0.0.1"), _T("8888")); \t // Bind socket to IOCP if (CreateIoCompletionPort((HANDLE)pClient->m_Socket, g_iocp, (ULONG_PTR)pClient, 0) == NULL) {<!-- --> WriteError(_T("CreateIoCompletionPort ERROR: Failed to bind socket to IOCP")); } // create buffer pClient->dataBuffer.buf = &(pClient->receivedData[0]); pClient->dataBuffer.len = 4096; // Submit overlapping IO requests if (pClient->m_Socket == INVALID_SOCKET) {<!-- --> WriteError(_T("SOCKET ERROR")); } // receive server message ZeroMemory( & amp;(pClient->overlapped), sizeof(OVERLAPPED)); DWORD flags = 0; if (WSARecv(pClient->m_Socket, &(pClient->dataBuffer), \t\t\t\t\t1, NULL, &flags, &(pClient->overlapped), NULL) == SOCKET_ERROR) {<!-- --> int errorCode = WSAGetLastError(); if (errorCode != WSA_IO_PENDING) {<!-- --> WriteError(_T("Failed to submit overlapped IO request: %d"), errorCode); closesocket(sockets[i]); /*CloseHandle(completionPort); WSACleanup();*/ } } }