Analysis of Linux network IO model (five) reactor model analysis and implementation method

reactor model analysis and implementation method

  • Network Model for Highly Concurrent Network Programming
  • Reactor
    • Reactor Introduction and Composition
    • Advantages of reactors
    • Analysis of the implementation part of reactor
      • Reactor related definitions
    • Reactor part module introduction
      • reactor initialization (multiplexer initialization)
      • Event Registration (Registration of Handlers)
      • Iterations of processing, event turn calls (dispatchers & triggers)
      • Test Results

Network model for high concurrent network programming

In the field of high-concurrency programming, the processing of access data can usually be divided into two stages:

Wait for the message to be ready
message processing

When using the default blocking socket (such as the aforementioned 1 thread binds a cpu and then processes 1 connection), these two stages are often combined into one, blocking waits for the message copy to complete, and then proceeds Processing. In this way, the thread that operates the socket code has to sleep to wait for the data to be ready, which leads to frequent sleep and wakeup threads in high concurrency scenarios, thus Affected CPU usage efficiency.

The high-concurrency programming method is of course to separate the two phases. That is, in terms of implementation, The code segment waiting for the message to be ready is separated from the Code segment processing the message. Of course, this also requires that the socket must be non-blocking, otherwise, The code segment for processing messages is prone to situations where the thread goes to sleep and waits again when the conditions are not met. So the question is, how to implement this part of the code waiting for the message to be ready? It is still waiting after all, which means that the thread still has to sleep!

The solution is to actively query the thread, or let one thread wait for all connections! This is IO multiplexing. Multiplexing is all about waiting for messages to be ready, it can handle multiple connections at the same time!

Of course, it may also “wait”, so it will also cause the thread to sleep, but this does not matter, because it is one-to-many and can monitor all connections. In this way, when our thread is woken up, there must be some connections ready to be processed by our code.

At the same time, as a high-performance server program, it usually needs to consider handling three types of events:

I/O events
timed event
Signal.

There are currently two efficient event handling models:

Synchronous IO model reactor under Linux platform
Proactor under Windows platform (IOCP + asynchronous IO)

At present, the main platform of the subject is still Linux, so this article will not discuss Proactor.

Reactor

Introduction and composition of reactor

Reactor, literal translation is the meaning of the reactor. In fact, it is more accurate to translate reactor, mainly because it will be reacted by a large number of file descriptors corresponding to response events, and the response of the event will trigger the reactor. Registered related operations. It is different from actively calling functions. The more positive way of thinking is that we check certain events and call the corresponding processing functions for the types of events.

The reactor reverses the way of thinking, associates the processing function with the event and registers it on the reactor. When an event occurs, it will drive the related processing function on the reactor. This function is what we often call the callback function.

In short, reactor consists of the following parts:

Multiplexer: generally select, poll, epoll and other multiplexed system calls. Since epoll performs better than poll + aio_read, it is generally epoll.
Event Dispatcher: Distributes corresponding events to corresponding processing functions.
Event handler: Commonly used specific event handler functions.

Advantages of reactor

Reactor has the following advantages:

The response is fast and does not need to be blocked by a single synchronous event, but the reactor itself is synchronous;
Programming is relatively simple, and single-threaded processing can avoid complex multi-threading and synchronization problems and switching overhead to the greatest extent;
Scalability, it is convenient to make full use of CPU resources by increasing the number of reactor instances;
Reusability, the reactor framework itself has nothing to do with specific event processing logic and has high reusability;

Analysis of the implementation of reactor

Reactor-related definitions

typedef int misc_event_callback(int fd, int events, void *arg);

typedef struct {<!-- -->
    int fd; ///< event fd
    int events; ///< event status
    void *arg; ///< extensible mem

/*Events correspond to the triggering processing functions. After different events are executed, the corresponding triggering events and functions will be modified. The general process is as follows
1. After the accept event is triggered, change the processing function to readable event processing
2. After the readable event is triggered, process the read data, and then change the processing function to writable event processing
3. After the writable event is triggered, change the processing function to read event processing, and then read and write repeatedly.
*/
    misc_event_callback* call_back_func; ///< function ptr -> processor
    int status;
    
    /*The memory used for reading and writing should actually be maintained by the memory pool, and pointers should be used to point to independent memory. This is a lazy declaration method*/
    uint8_t rbuffer[ITEM_BUFFER_LENGTH]; ///< get value
    uint32_t rlength;
    uint8_t wbuffer[ITEM_BUFFER_LENGTH]; ///< post value
    uint32_t wlength;
        
    char sec_accept[ACCEPT_KEY_LENGTH];
    int wsstatus;

}reactor_item_t;


///<event item bucket list
typedef struct eventblock {<!-- -->
    struct eventblock *next; ///< next node
    reactor_item_t *item_bucket; ///< item
}event_list_node_t;

///< reactor
typedef struct{<!-- -->
    int epoll_fd; ///< -> multiplexer
    int block_count;

    event_list_node_t *list_header; ///< points to the event management block, a block manages a fixed number of events, and after the event exceeds the range managed by a single management block, just expand the management block
}reactor_t;

Introduction to some modules of reactor

reactor initialization (multiplexer initialization)

static force_inline reactor_item_t* find_itme_in_reactor(reactor_t* reactor , int sockfd)
{<!-- -->
    int buctet_index = sockfd / MAX_EPOLL_EVENTS; ///<the subscript of the fd block where the calculation is located
    while(unlikely(buctet_index >= reactor->block_count))
        misc_reactor_alloc_block(reactor);
        
    int i = 0;

    event_list_node_t *tmp_node = reactor->list_header;
    while(i + + != buctet_index & amp; & amp; tmp_node != NULL)
        tmp_node = tmp_node->next;
        
    return & amp;(tmp_node->item_bucket[sockfd % MAX_EPOLL_EVENTS]); ///< Find the target item from the corresponding block and return
}


static int misc_reactor_add_listener(reactor_t* reactor, int sockfd, misc_event_callback* event_proc)
{<!-- -->
    if(unlikely(reactor == NULL))
    {<!-- -->
        zlog_error(g_zlog_handle, "reactor is invalid");
        return -1;
    }
    
    if(unlikely(reactor->list_header == NULL))
    {<!-- -->
        zlog_error(g_zlog_handle, "reactor's bucket node is invalid");
        return -1;
    }

    reactor_item_t *item_worker = find_itme_in_reactor(reactor , sockfd);
    if(unlikely(item_worker == NULL))
    {<!-- -->
        zlog_error(g_zlog_handle, "fisst itme node find failed");
        return -1;
    }

    misc_item_init(item_worker, sockfd, event_proc, reactor); ///< item initialization
    misc_item_add(reactor->epoll_fd , EPOLLIN , item_worker); ///< item added to epoll

    return 0;
}

static int misc_comm_init()
{<!-- -->
    if(misc_zlog_init())
        return -1;

    g_reactor =(typeof(g_reactor))malloc(sizeof(reactor_t));
    assert(g_reactor);

    if(misc_reactor_init())
        return -1;
    
    const uint8_t add_listrner_num = g_muti_mode == 1 ? MISC_HIGH_CONN_NUM : 0; ///< Increase the number of listening sockets according to whether the mass access mode is enabled
    int i = 0 ,tmp_sock_fd;
    for( ; i <= add_listrner_num ; i ++ )
    {<!-- -->
        g_sockfds[i] = misc_init_sock(g_init_port + i); ///< Initialize the socket according to the port number to be serviced
        if(likely(g_sockfds[i] > 0))
            misc_reactor_add_listener(g_reactor , g_sockfds[i] , accept_cb); ///< Add response pool if initialization is successful
    }

    return 0;
}

Event registration (handler registration)

static force_inline void misc_item_init(reactor_item_t *item, int sockfd, misc_event_callback* event_proc, reactor_t* reactor)
{<!-- -->
    item->fd = sockfd;
    item->call_back_func = event_proc;
    item->events = 0;
    item->arg = reactor;

    return;
}

Iteration of processing, event rotation calls (dispatchers & triggers)

/*Callback function prototype triggered by send event*/
static int misc_item_send_cb(int fd, int events, void *arg)
{<!-- -->

    reactor_t *reactor = (reactor_t*)arg;
    reactor_item_t *ev = find_itme_in_reactor(reactor, fd);

    if (ev == NULL) return -1;

    ws_response(ev);
   
    int len = send(fd, ev->wbuffer, ev->wlength, 0);
    if (len > 0) {<!-- -->
        zlog_info(g_zlog_handle ,"send[fd=%d], [%d]%s\
", fd, len, ev->wbuffer);

        epoll_item_del(reactor->epoll_fd, ev);
        misc_item_init(ev, fd, misc_item_recv_cb, reactor);
        misc_item_add(reactor->epoll_fd, EPOLLOUT, ev);
        
    } else {<!-- -->
        epoll_item_del(reactor->epoll_fd, ev);
        close(ev->fd);
        zlog_info(g_zlog_handle ,"send[fd=%d] error %s\
", fd, strerror(errno));
    }
    return len;
}

/* callback function prototype triggered by recv event */
static int misc_item_recv_cb(int fd, int events, void *arg)
{<!-- -->
    reactor_t *reactor = (reactor_t*)arg;
    reactor_item_t *ev = find_itme_in_reactor(reactor, fd);
    if (unlikely(ev == NULL)) return -1;

    int len = recv(fd, ev->rbuffer, ITEM_BUFFER_LENGTH, 0);
    zlog_info(g_zlog_handle ,"recv info :%s\
",ev->rbuffer);

    epoll_item_del(reactor->epoll_fd, ev); ///< delete the event

    if (len > 0)
    {<!-- -->
        ev->rlength = len;
        ev->rbuffer[len] = '\0';

        ws_request(ev); ///< process web_socket request content

        misc_item_init(ev, fd, misc_item_send_cb, reactor);
        misc_item_add(reactor->epoll_fd, EPOLLOUT, ev);
    }
    else if (len == 0)
    {<!-- -->
        epoll_item_del(reactor->epoll_fd, ev);
        zlog_info(g_zlog_handle, "recv_cb --> disconnect\
");
        close(ev->fd);
    }
    else
    {<!-- -->
        if (errno == EAGAIN & & errno == EWOULDBLOCK)
        {<!-- --> //
            
        }
        else if (errno == ECONNRESET)
        {<!-- -->
            epoll_item_del(reactor->epoll_fd, ev);
            close(ev->fd);
        }
        zlog_info(g_zlog_handle ,"recv[fd=%d] error[%d]:%s\
", fd, errno, strerror(errno));
    }
    return len;
}

/*The prototype of the callback function triggered by the accept event*/
static int accept_cb(int fd, int events, void *arg)
{<!-- -->
    reactor_t *reactor = (reactor_t*)arg;
    if (unlikely(reactor == NULL)) return -1;

    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);

    int clientfd;

    if ((clientfd = accept(fd, (struct sockaddr*) &client_addr, &len)) == -1)
    {<!-- -->
        if (errno != EAGAIN & amp; & amp; errno != EINTR)
        {<!-- -->
            
        }
        zlog_error(g_zlog_handle, "accept: %s", strerror(errno));
        return -1;
    }
    
    int flag = 0;
    if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
    {<!-- -->
        zlog_error(g_zlog_handle ,"%s: fcntl nonblocking failed, %d", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }

    reactor_item_t *work_item = find_itme_in_reactor(reactor, clientfd);

    if (work_item == NULL) return -1;
        
    misc_item_init(work_item, clientfd, misc_item_recv_cb, reactor);
    misc_item_add(reactor->epoll_fd, EPOLLIN, work_item);

    zlog_info(g_zlog_handle ,"new connect [%s:%d], pos[%d]",
        inet_ntoa(client_addr. sin_addr), ntohs(client_addr. sin_port), clientfd);

    return 0;
}

Test results

After testing with a large number of specially-made client tools, reactor has the ability to serve millions of (c1000K) concurrency. In fact, this ability benefits from two points:

  1. Multiplexing support for epoll
  2. The processing function is sufficiently robust and efficient

at last:
The knowledge points of this column are based on the systematic learning of , and the articles are summarized and written. Readers who are interested in c/c++ linux courses can go to ZeroSound’s official website to view detailed services, and welcome to join us Free public class, let’s make progress together~