About socket address multiplexing and port multiplexing technology and UDP concurrency

1. Socket quintuple

linux:

setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,(const void *) &reuse, sizeof(int));

setsockopt(fd, SOL_SOCKET, SO_REUSEPORT,(const void *) &reuse, sizeof(int));

windows:

setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,(const char*) &reuse, sizeof(int));

Socket is a system-level resource (different from process level and thread level, it is globally unique on this machine, https://zhuanlan.zhihu.com/p/612436510), which can be artificially bound to local ip and port, and protocol type , the peer ip, and the peer port are used as the unique identifier of the socket on this machine. When the network card has data coming, find the corresponding socket according to the protocol type, destination ip, destination port, source ip and source port of the data packet, and send the data to the corresponding buffer.

It seems that the unique identifier of the socket is indeed a five-tuple, but in some cases, the information in the five-tuple of the socket does not all take effect. Think about several situations in which sockets are created. If you create them manually, you cannot specify the ip and port of the peer. For example, if you create a udp socket, you cannot determine the ip and port of the peer before receiving data; For the TCP socket of listen, the ip and port of the other party are not sure before accepting the connection request. Only after tcp accepts the connection request, the socket created for us by the operating system will have complete quintuple information. Therefore, when the quintuple is uncertain, the kernel can only use limited ternary information to find the socket. If it is limited to udp communication, the protocol type cannot provide an effective basis for distinguishing, and the five-tuple will eventually degenerate into a two-tuple (the same is true for tcp).

If you can distinguish different sockets only by using the two-tuple information of the socket, then everyone will be happy. Therefore, it seems reasonable that the operating system does not allow the port to be occupied by multiple sockets at the same time by default. However, with the increasing complexity of computer technology and network technology, this simple assumption is no longer applicable in many scenarios. For example, in order to improve network throughput, multi-process network programming technology is introduced, which allows multiple sockets to be bound to the same ip and port. At this time, the two-tuple cannot correspond to a socket one by one. In order to manage these sockets with incomplete quintuple information, the linux operating system kernel builds a hash table data structure, uses limited 2-tuple information as a key to map sockets to hash buckets, and uses zipper method to handle hash conflicts. When the network card data packet comes, according to the header in the data packet, find the corresponding hash bucket, and randomly transfer the number to a certain socket in the bucket, usually using the round robin algorithm. It can be seen as kernel-level load balancing.

Usually, a server based on tcp rarely opens too many service processes on the same physical machine. When the business volume is large, it can be used as a server cluster, which can well avoid the above problems. However, in recent years, the UDP-based high-concurrency network model has become popular, especially the low-latency network transmission technology based on the QUIC protocol has achieved great success in the field of streaming media transmission, making people find that the UDP-based network model has many advantages. The following is a typical UDP concurrent network model:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#include <assert.h>

#define SO_REUSEPORT 15

#define MAXBUF 10240
#define MAXEPOLLSIZE 100


int read_data(int sd)
{
    char recvbuf[MAXBUF + 1];
    int ret;
    struct sockaddr_in client_addr;
    socklen_t cli_len=sizeof(client_addr);

    bzero(recvbuf, MAXBUF + 1);
  
    ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *) &client_addr, &cli_len);
    if (ret > 0) {
        printf("read[%d]: %s from %d\\
", ret, recvbuf, sd);
    } else {
        printf("read err:%s %d\\
", strerror(errno), ret);
      
    }
    fflush(stdout);
}

int udp_accept(int sd, struct sockaddr_in my_addr)
{
    int new_sd = -1;
    int ret = 0;
    int reuse = 1;
    char buf[16];
    struct sockaddr_in peer_addr;
    socklen_t cli_len = sizeof(peer_addr);

    ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *) & amp; peer_addr, & amp; cli_len);
    if (ret < 0) {
        return -1;
    }

    if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("child socket");
        exit(1);
    } else {
        printf("parent:%d new:%d\\
", sd, new_sd);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = bind(new_sd, (struct sockaddr *) & my_addr, sizeof(struct sockaddr));
    if (ret){
        perror("chid bind");
        exit(1);
    } else {
    }

    peer_addr.sin_family = PF_INET;
    printf("aaa:%s\\
", inet_ntoa(peer_addr. sin_addr));
    if (connect(new_sd, (struct sockaddr *) & peer_addr, sizeof(struct sockaddr)) == -1) {
        perror("chid connect");
        exit(1);
    } else {
    }

out:
    return new_sd;
}

int main(int argc, char **argv)
{
    int listener, kdpfd, nfds, n, curfds;
    socklen_t len;
    struct sockaddr_in my_addr, their_addr;
    unsigned int port;
    struct epoll_event ev;
    struct epoll_event events[MAXEPOLLSIZE];
    int opt = 1;;
    int ret = 0;

    port = 1234;
  
    if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("socket");
        exit(1);
    } else {
        printf("socket OK\\
");
    }

    ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR, &opt,sizeof(opt));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    if (ret) {
        exit(1);
    }
  
    bzero( & my_addr, sizeof(my_addr));
    my_addr.sin_family = PF_INET;
    my_addr.sin_port = htons(port);
    my_addr.sin_addr.s_addr = INADDR_ANY;
    if (bind(listener, (struct sockaddr *) & my_addr, sizeof(struct sockaddr)) == -1) {
        perror("bind");
        exit(1);
    } else {
        printf("IP bind OK\\
");
    }
    int flag = fcntl(listener,F_GETFL,0);
    fcntl(listener, F_SETFL, flag | O_NONBLOCK)
    kdpfd = epoll_create(MAXEPOLLSIZE);

    ev.events = EPOLLIN;//|EPOLLET;
    ev.data.fd = listener;

    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%dn", listener);
        return -1;
    } else {
        printf("ep add OK\\
");
    }
 
    while (1) {
      
        nfds = epoll_wait(kdpfd, events, 10000, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            break;
        }
      
        for (n = 0; n < nfds; + + n) {
            if (events[n].data.fd == listener) {
                printf("listener:%d\\
", n);
                int new_sd;
                struct epoll_event child_ev;
                while(1)
                {
                    new_sd = udp_accept(listener, my_addr);
                    if(new_sd==-1)break;
                    child_ev.events = EPOLLIN;
                    child_ev.data.fd = new_sd;
                    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0)
                    {
                        fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd);
                        return -1;
                    }
                }
            } else {
                read_data(events[n].data.fd);
            }
        }
    }
    close(listener);
    return 0;
}

//Source address: https://zhuanlan.zhihu.com/p/296666065

The above code, each time it receives data from a new client, will manually create a new udp socket to interact with the client. The new udp socket will be bound to the same address and port number as the initial udp socket, so all sockets created in this network model will be put into a hash bucket, and the hash table degenerates into a linked list. Observing the above code, you can notice that after knowing the source port and source ip of the client, the quintuple information of the new udp socket will be manually completed (the connect operation is performed, but the socket is still in the original hash bucket, To find it, you need to traverse this bucket), so that the new udp socket quintuple is uniquely determined, and the next time the client sends a data packet, it will directly find the new udp socket and put it in the specified buffer , instead of being randomly assigned.

It looks very nice, and it is back to the familiar routine that a socket only handles one client interaction. However, there are still serious problems. The operating system kernel has to traverse this hash bucket every time to find the one that only matches the quintuple. Socket, after the amount of concurrency, the search efficiency is very low (https://blog.csdn.net/dog250/article/details/104219341), this depends on the further optimization of the operating system kernel, and it is difficult to make improvements at the application layer. Another serious problem is that the new udp socket has no closing mechanism, and the system resources will soon be exhausted as the number of clients increases. A timeout exit mechanism must be added to close the socket that has no data received within a certain period of time. In addition, because udp itself is an unreliable protocol, the connection request packet sent by the client for the first time may be lost, especially when a large number of client connection request packets flood into the above listener socket in a short period of time, the recv_from call is not timely, It will cause packet loss on the server side. In order to solve this problem, the listener socket can be turned into a non-blocking, horizontally triggered operation mode, which can be appropriately alleviated, but from the root cause, the client must have a retransmission mechanism, which can not only solve the server-side packet loss problem, but also It can solve the problem of network link packet loss.

2. Windows UDP concurrent network model based on IOCP

In a previous blog, I mentioned the IOCP-based TCP concurrent network model of windows (https://blog.csdn.net/yuanshenqiang/article/details/120454822). It is also possible to change it to UDP concurrency, but It is somewhat different from the epoll-based udp concurrency under linux above. It seems that windows cannot manually bind the peer address of udp, resulting in port reuse not taking effect. ), the result of this is that after the client receives the first udp packet from the server, it needs to modify the target port, otherwise all the packets sent by the client will be aggregated into one socket, not to mention data packet loss, it will soon be It will overwhelm the server, because the listening socket will create a new socket for each message read, causing the server to crash due to insufficient resources. (Of course, you can also add policies on the server side to filter, and interested friends can modify it by themselves).

#include <iostream>
#define _WINSOCK_DEPRECATED_NO_WARNINGS

#define BUFFER_SIZE 1024
#define START_POST_ACCEPTEX 2
#define THREAD_COUNT 2
#define UDP_SERVER_PORT 50721

#include <Winsock2.h>
#pragma comment(lib, "Ws2_32.lib")

#include <MSWSock.h>
#pragma comment(lib, "Mswsock. lib")

struct ServerParams
{
SOCKET listenerSock;
HANDLE iocp;
};


enum IO_OP_TYPE
{
IO_ACCEPT,
IO_RECV,
IO_SEND,
IO_CONNECT,
IO_DISCONNECT,
IO_EX_SEND
};

typedef struct UdpOverlappedPerIO
{
OVERLAPPED overlapped;
SOCKET socket;
WSABUF wsaBuf;
IO_OP_TYPE type;
char wbuff[BUFFER_SIZE]; //write buffer
char rbuff[BUFFER_SIZE]; //read buffer
SOCKADDR_IN remoteAddr; //Storage data source IP address
int remoteAddrLen; //Storage data source IP address length
BOOL deprecated; //Whether the data is invalid, it only takes effect when the type is IO_EX_SEND
BOOL destroyEnable; //Whether recovery is allowed after data failure, only takes effect when type is IO_EX_SEND
}*LPUdpOverlappedPerIO;

int InitUdpServer(IN unsigned short port, OUT ServerParams & amp;pms)
{
WSADATA wsaData;
int ret;
ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (ret == 0)
{
pms.listenerSock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (pms. listenerSock != INVALID_SOCKET)
{
//int reuse = 1;
//setsockopt(pms.listenerSock, SOL_SOCKET, SO_REUSEADDR, (const char*) & amp;reuse, sizeof(int));
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(port);
ret = bind(pms. listenerSock, (SOCKADDR*) & address, sizeof(SOCKADDR));
if (ret == 0)
{
pms.iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
if (pms. iocp != NULL)
{
if (NULL != CreateIoCompletionPort((HANDLE)pms. listenerSock, pms. iocp, NULL, 0))
{
return 0;
}
CloseHandle(pms. iocp);
}
}
closesocket(pms. listenerSock);
}
WSACleanup();
}
if (ret == 0)
ret = -1;
return ret;
}

int UdpPostAcceptEx(IN HANDLE handle, IN SOCKADDR_IN* clientAddr, struct UdpOverlappedPerIO** slot)
{
SOCKET sock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET)
return -1;
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
// htons(UDP_SERVER_PORT);// UDP always fails to execute ConnectEx, so the service port is not reused here, and the operating system randomly assigns the port number.
// Therefore, the client should update the destination port in time so that the datagram can be sent to the newly created socket
address.sin_port = 0;
int ret = bind(sock, (SOCKADDR*) & address, sizeof(SOCKADDR));
UdpOverlappedPerIO* overlp = new UdpOverlappedPerIO;
if (overlp == nullptr|| ret!=0)
{
closesocket(sock);
return -1;
}
CreateIoCompletionPort((HANDLE)sock, handle, NULL, 0);
// Get the pointer of the ConnectEx function
LPFN_CONNECTEX fpConnectEx = NULL;
LPFN_DISCONNECTEX fpDisconnectEx = NULL;
DWORD dwBytes = 0, sendBytes=0;
GUID GuidConnectEx = WSAID_CONNECTEX;
GUID GuidDisconnectEx = WSAID_DISCONNECTEX;
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, & GuidConnectEx, sizeof(GuidConnectEx),
&fpConnectEx, sizeof(fpConnectEx), &dwBytes, 0, 0);
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, & GuidDisconnectEx, sizeof(GuidDisconnectEx),
&fpDisconnectEx, sizeof(fpDisconnectEx), &dwBytes, 0, 0);

memset(overlp, 0x00, sizeof(UdpOverlappedPerIO));
overlp->socket = sock;
overlp->wsaBuf.buf = overlp->rbuff;
overlp->wsaBuf.len = BUFFER_SIZE;
overlp->type = IO_OP_TYPE::IO_RECV;
overlp->remoteAddr = *clientAddr;
overlp->remoteAddrLen = sizeof(SOCKADDR_IN);

/*
setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
while (fpConnectEx(sock, (struct sockaddr*) &overlp->remoteAddr, overlp->remoteAddrLen, NULL, 0, NULL, (LPOVERLAPPED)overlp) == FALSE)
{
int errorCode = WSAGetLastError();
if (errorCode == WSA_IO_PENDING)
{
break;
}
std::cout << "AcceptEx Error: " << errorCode << "," << WSAEINVAL << std::endl;
}
*/
    overlp->type = IO_OP_TYPE::IO_SEND;
//overlp->wsaBuf.len = sprintf_s(overlp->wbuff, BUFFER_SIZE, "Reply from server socket, with fd=%d. You should update target communication address.\\
", sock);
//overlp->wsaBuf.buf = overlp->wbuff;
    overlp->wsaBuf.buf = "Reply from server socket, You should update target communication address.\\
";
    overlp->wsaBuf.len = 74;
WSASendTo(sock, & amp;(overlp->wsaBuf), 1, & amp;sendBytes, 0, (struct sockaddr*)clientAddr, sizeof(SOCKADDR_IN), (LPOVERLAPPED)overlp, NULL);
*slot = overlp;
return sock;
}

void MessageProcess(IN HANDLE handle, IN SOCKADDR_IN* clientAddr, IN int fd, IN void* msg, IN DWORD msglen, OUT void* res, IN OUT DWORD* reslen);

DWORD WINAPI udpworkthread(LPVOID lpParam)
{
ServerParams* pms = (ServerParams*)lpParam;
HANDLE completionPort = pms->iocp;
SOCKET listenSock = pms->listenerSock;
DWORD bytesTrans;
ULONG_PTR completionKey;
LPUdpOverlappedPerIO overlp;
DWORD errCode;
int ret;
while (1)
{
BOOL result = GetQueuedCompletionStatus(completionPort, &bytesTrans, &completionKey, (LPOVERLAPPED*) &overlp, INFINITE);
if (!result)
{
errCode = GetLastError();
if ((errCode == WAIT_TIMEOUT) || (errCode == ERROR_NETNAME_DELETED) || (errCode == ERROR_OPERATION_ABORTED))
{
if (overlp != NULL)
{
std::cout << "socket disconnect, fd = " << overlp->socket << std::endl;
closesocket(overlp->socket);
delete overlp;
}
continue;
}
std::cout << "GetQueuedCompletionStatus failed, Maybe IOCP handle has been closed." << std::endl;
break;
}
if (overlp == NULL)
{
std::cout << "GetQueuedCompletionStatus Quit Normally." << std::endl;
break;
}
switch (overlp->type)
{
case IO_OP_TYPE::IO_ACCEPT:
{
std::cout << "happened IO_ACCEPT, Data length = " << bytesTrans << ", sock fd= " << overlp->socket << std::endl;
if (bytesTrans == 0)
{
std::cout << "socket disconnect, fd = " << overlp->socket << std::endl;
closesocket(overlp->socket);
delete overlp;
continue;
}
int new_sock = INVALID_SOCKET;
LPUdpOverlappedPerIO slot = NULL;
DWORD dwRecv = 0, dwSend = 0, dwFlag = 0;
DWORD sendBuffSz = BUFFER_SIZE;
ZeroMemory(overlp->wbuff, BUFFER_SIZE);
std::cout << "client accepted, ip = " << inet_ntoa(overlp->remoteAddr.sin_addr) << " port = " << ntohs(overlp->remoteAddr.sin_port) << std:: endl;
new_sock = UdpPostAcceptEx(completionPort, &overlp->remoteAddr, &slot);
if (slot != NULL)
MessageProcess(completionPort, & amp; slot->remoteAddr, new_sock, overlp->rbuff, bytesTrans, slot->wbuff, & amp;sendBuffSz);
if (slot != NULL & amp; & amp; new_sock > 0 & amp; & amp; sendBuffSz > 0)
{
slot->type = IO_OP_TYPE::IO_SEND;
slot->wsaBuf.buf = slot->wbuff;
slot->wsaBuf.len = sendBuffSz;
slot->remoteAddrLen = sizeof(SOCKADDR_IN);
ret = WSASendTo(new_sock, & amp;(slot->wsaBuf), 1, & amp;dwSend, dwFlag, (struct sockaddr*) & amp;slot->remoteAddr, sizeof(SOCKADDR_IN), (LPOVERLAPPED)slot, NULL) ;
}
ZeroMemory(overlp->rbuff, BUFFER_SIZE);
overlp->type = IO_OP_TYPE::IO_ACCEPT;
overlp->wsaBuf.buf = overlp->rbuff;
overlp->wsaBuf.len = BUFFER_SIZE;
overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
ret = WSARecvFrom(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, (SOCKADDR*) &(overlp->remoteAddr), &(overlp-> remoteAddrLen), (LPOVERLAPPED)overlp, NULL);
}
break;
case IO_OP_TYPE::IO_RECV:
{
std::cout << "happened IO_RECV, Data length = " << bytesTrans << ", sock fd= " << overlp->socket << std::endl;
if (bytesTrans == 0)
{
std::cout << "socket disconnect, fd = " << overlp->socket << std::endl;
closesocket(overlp->socket);
delete overlp;
continue;
}
DWORD dwRecv = 0, dwSend = 0, dwFlag = 0;
DWORD sendBuffSz = BUFFER_SIZE;
ZeroMemory(overlp->wbuff, BUFFER_SIZE);
MessageProcess(completionPort, &overlp->remoteAddr, overlp->socket, overlp->rbuff, bytesTrans, overlp->wbuff, &sendBuffSz);
ZeroMemory(overlp->rbuff, BUFFER_SIZE);
if (sendBuffSz > 0)
{
overlp->type = IO_OP_TYPE::IO_SEND;
overlp->wsaBuf.buf = overlp->wbuff;
overlp->wsaBuf.len = sendBuffSz;
overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
ret = WSASendTo(overlp->socket, & amp;(overlp->wsaBuf), 1, & amp;dwSend, dwFlag, (struct sockaddr*) & amp;overlp->remoteAddr, sizeof(SOCKADDR_IN), (LPOVERLAPPED)overlp , NULL);
}
else
{
overlp->type = IO_OP_TYPE::IO_RECV;
overlp->wsaBuf.buf = overlp->rbuff;
overlp->wsaBuf.len = BUFFER_SIZE;
overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
ret = WSARecvFrom(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, (SOCKADDR*) & (overlp->remoteAddr), &(overlp-> remoteAddrLen), &(overlp->overlapped), NULL);
}
if (ret == SOCKET_ERROR)
{
errCode = GetLastError();
if (errCode != WSA_IO_PENDING)
std::cout << "WSARecvFrom/WSASendTo failed: " << errCode << std::endl;
}
}
break;
case IO_OP_TYPE::IO_SEND:
{
std::cout << "happened IO_SEND: " << bytesTrans << std::endl;
if (bytesTrans == 0)
{
std::cout << "socket disconnect, fd = " << overlp->socket << std::endl;
closesocket(overlp->socket);
delete overlp;
continue;
}
ZeroMemory(overlp->wbuff, BUFFER_SIZE);
ZeroMemory(overlp->rbuff, BUFFER_SIZE);
overlp->type = IO_OP_TYPE::IO_RECV;
overlp->wsaBuf.buf = overlp->rbuff;
overlp->wsaBuf.len = BUFFER_SIZE;
overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
DWORD dwRecv = 0, dwFlag = 0;
ret = WSARecvFrom(overlp->socket, & amp;(overlp->wsaBuf), 1, & amp;dwRecv, & amp;dwFlag, (SOCKADDR*) & amp;(overlp->remoteAddr), & amp;(overlp ->remoteAddrLen), &(overlp->overlapped), NULL);
if (ret == SOCKET_ERROR)
{
errCode = GetLastError();
if (errCode != WSA_IO_PENDING)
std::cout << "IO_SEND WSARecv failed: " << errCode << std::endl;
}
}
break;
case IO_OP_TYPE::IO_EX_SEND:
{
if (overlp->deprecated == TRUE)
{
std::cout << "happened IO_EX_SEND: " << bytesTrans << std::endl;
if (overlp->destroyEnable)
delete overlp;
break;
}
DWORD dwFlag = 0, dwSend = 0;
overlp->deprecated = TRUE;
ret = WSASendTo(overlp->socket, & amp;(overlp->wsaBuf), 1, & amp;dwSend, dwFlag, (struct sockaddr*) & amp;overlp->remoteAddr, sizeof(SOCKADDR_IN), (LPOVERLAPPED)overlp , NULL);
if (ret == SOCKET_ERROR)
{
errCode = GetLastError();
if (errCode != WSA_IO_PENDING)
{
closesocket(overlp->socket);
delete overlp;
std::cout << "IO_EX_SEND WSASendTo failed: " << errCode << std::endl;
}
}
}
break;
default:
break;
}
}
//_endthread();//<process.h>
return 0;
}


void MessageProcess(IN HANDLE handle, IN SOCKADDR_IN* clientAddr, IN int fd, IN void* msg, IN DWORD msglen, OUT void* res, IN OUT DWORD* reslen)
{
std::string msgstr((char*)msg, msglen);
//*reslen = sprintf_s((char*)res, *reslen, "get your msg: ");
    //memcpy((char*)res + *reslen,msg,msglen);
    //*reslen = *reslen + msglen;
*reslen = 0;//According to the business situation, if the result cannot be returned immediately, you can set it to 0 here, and then post the write event through PostQueuedCompletionStatus at the right time

//************************************ examples for write delay********** ***********************
LPUdpOverlappedPerIO overlp = new UdpOverlappedPerIO;
memset(overlp, 0x00, sizeof(UdpOverlappedPerIO));
overlp->socket = fd;
overlp->type = IO_OP_TYPE::IO_EX_SEND;
overlp->wsaBuf.len = sprintf_s(overlp->wbuff, BUFFER_SIZE, "Delay reply from server socket, with fd=%d.\\
", fd);
overlp->wsaBuf.buf = overlp->wbuff;
overlp->deprecated = FALSE;
overlp->destroyEnable = TRUE;
overlp->remoteAddr = *clientAddr;
overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
PostQueuedCompletionStatus(handle, 0, 0, (LPOVERLAPPED)overlp);
//************************************ end for examples************ *****************************
}

int main()
{
int ret;
ServerParams pms{ 0 };
ret = InitUdpServer(UDP_SERVER_PORT, pms);
if (ret != 0)
{
std::cout << "Init server failed." << std::endl;
return -1;
}
for (int i = 0; i <thREAD_COUNT; i ++ )
{
CreateThread(NULL, 0, udpworkthread, &pms, 0, NULL);
}
\t
for (int i = 0; i < START_POST_ACCEPTEX; i ++ )
{
UdpOverlappedPerIO* overlp = new UdpOverlappedPerIO;
memset(overlp, 0x00, sizeof(UdpOverlappedPerIO));
overlp->socket = pms.listenerSock;
overlp->wsaBuf.buf = overlp->rbuff;
overlp->wsaBuf.len = BUFFER_SIZE;
overlp->type = IO_OP_TYPE::IO_ACCEPT;
overlp->remoteAddrLen = sizeof(SOCKADDR_IN);
DWORD dwFlag = 0,dwRecv=0;
std::cout << "udp server listening sock= " << overlp->socket << std::endl;
ret = WSARecvFrom(overlp->socket, & amp;(overlp->wsaBuf), 1, & amp;dwRecv, & amp;dwFlag, (struct sockaddr*) & amp;(overlp->remoteAddr), & amp;overlp ->remoteAddrLen, (LPOVERLAPPED)overlp, NULL);
if (ret == SOCKET_ERROR)
{
int errCode = GetLastError();
if (errCode != WSA_IO_PENDING)
std::cout << "WSARecvFrom failed: " << errCode << std::endl;
}
}
getchar();
for (int i = 0; i <thREAD_COUNT; i ++ )
{
PostQueuedCompletionStatus(pms. iocp, 0, 0, NULL);
}
Sleep(1000);
closesocket(pms. listenerSock);
CloseHandle(pms. iocp);
WSACleanup();
return 0;
}