Webserver 02 semi-synchronous semi-reactor thread pool

Directory

Thread Pool:

Why use a thread pool?

How to create a thread pool?

semi-synchronous/semi-reactor

Five IO models

Blocking and non-blocking IO

The difference between non-blocking IO and asynchronous IO

Synchronous IO and asynchronous IO

Difference between event processing mode Reactor and Proactor mode

Synchronous IO simulation proactor mode

Is the more threads in multithreading the better?

How much memory each thread occupies

If a client request needs to occupy a thread for a long time, will it affect the next client request? What is a good strategy?

If 1,000 clients make access requests at the same time, and the number of threads is not many, how can they respond to each one in a timely manner?

What is the state of the thread pool worker thread after processing a task?

Are the worker threads in the thread pool always waiting?

Thread pool code analysis

Selection of the number of threads in the thread pool:

static member variable

static member function

Definition of thread pool class:

Thread pool creation and recycling:

Add tasks to the request queue


Thread pool:

1. Space is exchanged for time, using the hardware resources of the server in exchange for operating efficiency;

2. The pool is a collection of a group of resources, which are fully created and initialized at the beginning of the server startup, called static resources;

3. When the server enters the official operation stage, it starts to process customer requests. If it needs relevant resources, it can be obtained directly from the pool without dynamic allocation;

4. After the server finishes processing a client connection, it can put the relevant resources back into the pool without executing system calls;

Why use thread pool?

1. Avoid the overhead of creating and destroying threads, and avoid active threads consuming system resources;

2. Improve the response speed;

3. Improve the manageability of threads, and use the thread pool for unified allocation, tuning and monitoring;

Use multi-threading to make full use of multi-core CPUs, and use thread pools to avoid frequent creation, destruction and increased system overhead:

a) Create a thread pool to manage multithreading. The thread pool mainly includes task queue and worker thread collections, add tasks to the queue, and after creating threads, Start these tasks automatically. A worker thread with a fixed number of threads is used to limit the maximum number of concurrent threads;

b) Multiple threads share the task queue, so synchronization between threads is required

How to create a thread pool?

The project uses a thread pool ((semi-synchronous semi-reactor mode)) to process user requests concurrently,

The main thread is responsible for monitoring the file descriptor and accepting new socket connections. If a read or write event occurs on the currently listening socket, the task is inserted into the request queue.

Worker threads (threads in the connection pool) take out tasks from the request queue, complete data read and write processing, and logical processing (analysis of HTTP request messages, etc.).

Semi-synchronous/semi-reactor

Semi-sync/Half-async mode:

It includes both synchronous operations and asynchronous operations. In this mode, synchronous operations and asynchronous operations work together to process concurrent tasks to improve efficiency and scalability.

When implementing semi-synchronous/semi-asynchronous mode, a thread pool and a message queue are usually used. Synchronous operations are executed directly in the main thread, while asynchronous operations are executed in a separate thread and the results are placed in a message queue. The main thread will read the results from the message queue and process them. In this way, synchronous operations and asynchronous operations can be performed simultaneously, improving the processing efficiency of the program.

Workflow:

  • Synchronous threads are used to process client logic

  • Asynchronous threads are used to handle I/O events

  • After the asynchronous thread listens to the client request, it encapsulates it into a request object and inserts it into the request queue

  • The request queue will notify a worker thread working in synchronous mode to read and process the request object

Leader/follower model:

The leader is responsible for the state management, decision-making and other core tasks of the system, and the follower nodes communicate with the leader node to obtain system state update instructions. Under normal circumstances, if the leader node goes down or loses connection, other nodes in the system will automatically elect a new leader node to ensure the normal operation of the system.

The semi-synchronous/semi-reactor concurrency pattern is a variant of semi-synchronous/semi-asynchronous, which embodies semi-asynchronous as an event processing pattern.

Synchronous and asynchronous in concurrent mode:

· Synchronization refers to the execution of the program completely in the order of the code sequence

Asynchronous means that the execution of the program needs to wait for a specific event to occur before executing the corresponding code

Semi-synchronous/semi-reactor workflow (taking Proactor mode as an example)

  • The main thread acts as an asynchronous thread, responsible for listening to events on all sockets

  • If a new request arrives, the main thread receives it to obtain a new connection socket, and then registers the read and write events on the socket in the epoll kernel event table

  • If a read-write event occurs on the connected socket, the main thread receives data from the socket, encapsulates the data into a request object, and inserts it into the request queue

  • All worker threads sleep on the request queue, and when a task arrives, they obtain the right to take over the task through competition (such as a mutex)

Five IO models

  • Blocking IO: The caller has called a certain function and must wait for the function to return without doing anything during the period, constantly checking whether the function has returned, must wait for the function to return Take the next step

  • Non-blocking IO: Non-blocking waiting, checks whether the IO event is ready every once in a while, and can do other things if it is not ready. The non-blocking I/O execution system call always returns immediately, no matter whether the event has occurred or not, if the event has not occurred, it will return -1. At this time, the two situations can be distinguished according to errno , for accept, recv and send, errno is usually set to eagain when the event does not occur (meaning that an operation cannot be completed temporarily, so you need to wait for a while before trying again.)

  • Signal-driven IO: Linux uses sockets for signal-driven IO, install a signal processing function, the process continues to run without blocking, when the IO time is ready, the process receives the SIGIO signal , and then handle the IO event.

  • IO Multiplexing: Linux uses the select/poll function to implement the IO multiplexing model. These two functions will also block the process, but the difference from blocking IO is that these two functions can be used at the same time Block multiple IO operations. And it can check the IO functions of multiple read and write operations at the same time. The IO operation function is actually called when there is data readable or writable

  • Asynchronous IO: In linux, you can call the aio_read function to tell the kernel description word buffer pointer and buffer size, file offset and notification method, and then return immediately, when After the kernel copies the data to the buffer, it notifies the application.

Note: Blocking I/O, non-blocking I/O, signal-driven I/O, and I/O multiplexing are all synchronous I/O.

Synchronous I/O means that the kernel notifies the application of a ready event. For example, it only notifies that there is a client connection and requires the user code to perform I/O operations by itself. Asynchronous I/O means that the kernel notifies the application that it is complete. Events, such as notifying the application after reading the data of the client, the I/O operation is completed by the kernel.

Blocking and non-blocking IO

1. Blocking IO: I call a function, this function is stuck here, the whole program flow does not go down [sleep], this function is stuck here waiting for something to happen, only when this thing happens, this function will Will go down; this kind of function is considered a blocking function; accept();

This kind of blocking is not good, and the efficiency is very low; generally, we do not use blocking methods to write server programs, and the efficiency is low;

2. Non-blocking IO: will not be stuck, make full use of time slices, and have high execution efficiency

(1) Constantly call the accept() and recvfrom() functions to check whether there is any data coming. If not, the function will return a special error flag to tell you that this flag may be EWULDBLOCK or EAGAIN; if the data If it doesn’t arrive, there is a chance to execute other functions here, but you have to call accept() and recvfrom() again to check whether the data has arrived, which is very tiring;

(2) If the data arrives, you have to get stuck here to copy the data from the kernel buffer to the user buffer, so the copying stage is stuck and completed;

The difference between non-blocking IO and asynchronous IO

Both are solutions to improve IO efficiency and performance. Non-blocking IO means that during the IO operation, if there is no data to read or write, it will not wait forever, but return immediately, but it will be in Regular polling wastes CPU resources.

Asynchronous IO means that when performing IO operations, you do not need to wait for the completion of the operation, you can continue to perform other operations, and notify the application after the IO operation is completed. In this way, CPU resources can be fully utilized, and system throughput and concurrent performance can be improved. However, asynchronous IO requires the support of the operating system, and the programming complexity is high.

To sum up, non-blocking IO is suitable for scenarios with relatively simple IO operations and low concurrency, while asynchronous IO is suitable for scenarios with complex IO operations and high concurrency.

Synchronous IO and asynchronous IO

1. Asynchronous IO: When calling an asynchronous IO function, we need to specify a receiving buffer for this function and a callback function. After calling an asynchronous IO function, the function will return immediately. The rest of the judgment is handed over to the operating system. The operating system will judge whether the data has arrived. If the data has arrived, the operating system will copy the data to the buffer provided by you, and then call the callback function you specified to notify you.

2. Synchronous IO: Call select() to judge whether there is data, if there is data, go down, there is no data stuck there; after select() returns, use recvfrom() to fetch data; of course, it will get stuck when fetching data ;

In general, synchronous IO is suitable for processing small amounts of data or a single request, while asynchronous IO is suitable for processing large amounts of data or high concurrent requests, which can improve program performance and throughput.

The difference between event processing mode Reactor and Proactor mode

  • In reactor mode, the main thread (I/O processing unit)is only responsible for monitoring whether there is an event on the file descriptor, and immediately notifies the worker thread ( Logic unit ), which puts socket readable events into the request queue and hands them over to the worker thread for processing. In addition, the main thread does not do any other work, and it is handed over to the worker thread for processing. In addition, reading and writing data, accepting new connections, and processing client requests are all done in worker threads, usually implemented by synchronous I/O.

  • In proactor mode, the main thread and the kernel are responsible for processing I/O operations such as reading and writing data and accepting new connections, and the worker thread is only responsible for business logic, such as processing customer requests. Usually implemented by asynchronous I/O.

Synchronous IO simulation proactor mode

Since asynchronous IO is immature, it is rarely used in practice, here we will use synchronous IO simulation to realize the proactor mode

The workflow of the synchronous IO model is as follows: (epoll_wait) as an example

· Main thread registers the read ready event on the socket to the epoll kernel event table;

` Main thread calls epoll_wait to wait for data to be read on the socket

· When there is data readable on the socket, epoll_wait notifies the main thread, and the main thread reads data from the socket in a loop until there is no more data to read, and then encapsulates the read data into a request object and inserts it into < strong>Request Queue;

A working thread thread that sleeps on the request queue is awakened, it obtains the request object and processes the client request, and then registers the write event ready event on the socket in the epoll kernel schedule

· The main thread calls epoll_wait to wait for the socket to be writable

· When there is data writable on the socket, epoll_wait notifies the main thread. The main thread writes the result of the server processing the client request to the socket.

The more threads in multithreading, the better

no.

  1. If there are 8 CPUs and 8 threads, and each thread occupies one CPU, at the same time, all eight threads run forward, which is more efficient
  2. If there are 9 threads at this time, running at the same time will involve thread switching
  3. As the number of threads increases, the efficiency becomes higher and higher, but when it reaches a peak value, problems will arise if there are more threads. Too many threads need to be switched back and forth, which may be used for switching time is longer than the execution time, which is not appropriate

How much memory each thread occupies

The memory usage of each thread depends on the stack space allocated by the thread and the heap space used by the thread. Generally, the memory usage of each thread is relatively small, ranging from several hundred KB to several MB. However, if a thread needs to manipulate large amounts of data or perform complex calculations, its memory footprint may increase.

If a client request needs to occupy a thread for a long time, will it affect the next client request? What is a good strategy?

Yes, because the number of threads in the thread pool is limited, if the customer request takes too long, it will affect the efficiency of request processing, and when the request processing is too slow, it will As a result, subsequent accepted requests can only wait in the request queue to be processed, thereby affecting the next client request.

We can set the processing timeout time for the thread processing request object, send a signal to inform the thread processing timeout before the time is exceeded, and then set a time interval to check again, if the request still occupies the thread at this time, directly delete it Disconnect.

If 1000 clients make access requests at the same time, and the number of threads is not many, how can they respond to each one in time?

This project solves the problem of high concurrency by cyclically calling the sub-threads.

First of all, pthread_detach is called to separate the thread while creating the thread, and the resource is automatically recycled without recycling the worker thread separately.

We use the run call function of the sub-thread to perform a while loop, so that the threads in each thread pool will never stop, and the access request is encapsulated into the request queue (list). If there is no task thread, it will always be blocked and waiting. If there is a task thread Process it preemptively until the request queue is empty, indicating that all tasks have been processed.

What is the state of the thread pool worker thread after processing a task

(1) If the request queue is empty after processing the task, the thread returns to the blocked waiting state

(2) If the request queue is not empty after processing the task, then this thread will be in a state of competing for resources with other threads, and whoever gets the lock will be qualified to process the event.

Are the worker threads in the thread pool always waiting?

The worker threads in the thread pool are always in a blocked waiting state. Because when we created the thread pool at the beginning, we created 8 threads in the thread pool by calling pthread_create circularly, the worker thread processing function interface is the custom worker function pointed to by the third parameter pointer in the pthread_create function, and then the child thread calls the members of the thread pool class Function run The custom function executes the process function task to run.

Worker must be a static function. Since static member functions can only access static member variables, in order to be able to access non-static member variables in the class, this requirement can only be achieved by calling the non-static member variable of the run function in the worker. .

In the run function, in order to be able to handle high concurrency issues, we set the worker threads in the thread pool to block and wait on the condition that the request queue is not empty, so the worker threads in the thread pool in the project are in In the mode of blocking and waiting all the time.

Thread pool code analysis

The main thread is an asynchronous thread = “monitoring file descriptors (receiving new socket connections and listening to socket read and write events and encapsulating data into a request object and inserting it into the request queue)

Working thread takes out tasks from the request queue and completes the processing of reading and writing data.

Select the number of threads in the thread pool:

8 because I have an eight-core CPU

The main purpose of adjusting the number of threads in this thread pool is to fully and reasonably use CPU and memory resources, thereby maximizing the performance of the program.

If it is a CPU-intensive task, squeeze the CPU as much as possible, set the bit Ncpu + 1, ( + 1 is to ensure that when the thread is suspended due to a page missing fault (operating system) or other reasons, the extra thread can be topped up to ensure that the CPU clock cycles are not wasted)

If it is an IO-intensive task, the reference value can be set to 2 * Ncpu IO processing is generally slower , more threads than the number of cores will fight for more tasks for the CPU, so that the CPU will not be idle during the process of thread processing IO, resulting in waste of resources

Static member variable

Declaring a class member variable as static means it is a static member variable. Unlike ordinary member variables, no matter how many objects are created, there is only one copy of the static member variable. Static member variables belong to a class and are shared by all objects.

Static variables are allocated space during the compilation phase, and space is allocated before the object is created, and placed in the global static area.

Static member function

Declare a class member function as static, then it is a static member function.

  • Static member functions can directly access static member variables, but cannot directly access ordinary member variables, but can be accessed by passing parameters.

  • Ordinary member functions can access ordinary member variables as well as static member variables.

  • Static member functions do not have a this pointer. Non-static data members are maintained separately for objects, but static member functions are shared functions, and it is impossible to distinguish which object it is, so ordinary variable members cannot be directly accessed, and there is no this pointer.

Definition of thread pool class:

The thread processing function and running function are set as private functions, which can ensure that only the thread pool class can be accessed and modified, avoiding the interference of external programs, and improving the reliability and security of the code.

//Thread pool class definition
template <typename T>
class thread pool
{
public:
    /*connPool is the database connection pool pointer
    thread_number is the number of threads in the thread pool,
    max_requests is the maximum number of requests waiting to be processed in the request queue*/
    threadpool(connection_pool *connPool, int thread_number = 8, int max_request = 10000);
    ~threadpool();
    bool append(T *request);
private:
    /*The function run by the worker thread, which continuously takes out tasks from the work queue and executes them*/
    static void *worker(void *arg);
    void run();
private:
    int m_thread_number; //The number of threads in the thread pool
    int m_max_requests; //The maximum number of requests allowed in the request queue
    pthread_t *m_threads; //Array describing the thread pool, its size is m_thread_number
    std::list<T *> m_workqueue; //request queue
    locker m_queuelocker; //protect the mutex of the request queue
    sem m_queuestat; //Whether there is a task to be processed
    bool m_stop; //whether to end the thread
    connection_pool *m_connPool; //Database connection pool
};

Thread pool creation and recycling:

The constructor creates a thread pool. In the pthread_create function, the object of the class is passed as a parameter to the static function (work). This object is referenced in the static function and the run function is called.

//thread processing function
template <typename T>
void *threadpool<T>::worker(void *arg)
{
    //Forcibly convert the parameter to the thread pool class and call the member method
    threadpool *pool = (threadpool *)arg;
    pool->run();
    return pool;
}
//execute task
template <typename T>
void threadpool<T>::run()
{
    while (!m_stop)
    {
        // semaphore wait
        m_queuestat.wait();

        m_queuelocker. lock();
        if (m_workqueue. empty())
        {
            m_queuelocker. unlock();
            continue;
        }

        //Retrieve the first task from the request
        // remove the task from the request queue
        T *request = m_workqueue. front();
        m_workqueue. pop_front();
        m_queuelocker. unlock();
        if (!request)
            continue;

        connectionRAII mysqlcon( &request->mysql, m_connPool);
        
        request->process();
    }
}
//thread pool creation and recycling

template <typename T>
threadpool<T>::threadpool( connection_pool *connPool, int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false), m_threads(NULL), m_connPool(connPool)
{
    if (thread_number <= 0 || max_requests <= 0)
        throw std::exception();

    //thread id initialization
    m_threads = new pthread_t[m_thread_number];
    if (!m_threads)
        throw std::exception();
    for (int i = 0; i <thread_number; + + i)
    {
        //Create threads in a loop and run the worker threads as required
        //printf("create the %dth thread\\
",i);
        if (pthread_create(m_threads + i, NULL, worker, this) != 0)
        {
            delete[] m_threads;
            throw std::exception();
        }
        //Separate threads from threads without recycling worker threads separately
        if (pthread_detach(m_threads[i]))
        {
            delete[] m_threads;
            throw std::exception();
        }
    }
}

Add task to request queue

Create a request queue through the list container. When adding to the queue, use a mutex to ensure thread safety. After the addition is completed, use a semaphore to remind you that there are tasks to be processed. Finally, pay attention to thread synchronization.

The request queue can be created by (array, linked list, queue, stack and other containers) linked list can dynamically add or delete elements

//Add tasks to the request queue

template <typename T>
bool threadpool<T>::append(T *request)
{
    m_queuelocker. lock();

    //According to the hardware, pre-set the maximum value of the request queue to 10000
    if (m_workqueue. size() > m_max_requests)
    {
        m_queuelocker. unlock();
        return false;
    }
    //add task
    m_workqueue.push_back(request);
    m_queuelocker. unlock();

    //The semaphore reminds that there is a task to be processed
    m_queuestat. post();
    return true;
}