C language thread solution pool interpretation and implementation 01

C language thread depooling interpretation and implementation 01

When talking about concurrency and pooled components, the first thing that comes to mind is definitely the thread pool. So what is the principle of the thread pool and how does it work? This article tells you the answer.

Knowledge sorting

  1. What is thread pool
    A thread pool is a pool component that maintains and manages a certain number of threads. It has the effect of improving CPU work efficiency
  2. Why do you need a thread pool?
    In layman’s terms, if we have an IO that is very time-consuming but we are single-threaded, then our thread will block and wait for the IO to complete before continuing. This will be very time consuming.
  3. What are the components of the thread pool?
    • task queue
    • a certain number of threads
    • Lock (guaranteed thread safety)
  4. The thread pool is used by Ruge to manage threads.
    • Have a task: perform a task
    • No tasks: CPU sleeps

Header file interpretation

Let’s take a look at the header file first:

#ifndef THRDPOOL_H_
#define THRDPOOL_H_

typedef struct thrdpool_s thrdpool_t;
typedef void (*handler_pt)(void*);

#ifdef __cplusplus
extern "C"
{<!-- -->
#endif

thrdpool_t *thrdpool_create(int thrd_count);

void thrdpool_treminater(thrdpool_t *pool);

int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);

void thrdpool_waitdone(thrdpool_t *pool);

#ifdef __cplusplus
}
#endif
#endif

Code interpretation:
As a component, or a library, we don’t want others to see how we implement it internally, so we just tell users how to use it. thrdpool_create is used to initialize a thread pool, and the parameter is the number of threads. thrdpool_terminer is used to stop the thread pool. thrdpool_post is used to throw a task, that is, in which thread pool, which function is executed, and what are the parameters. thrdpool_waitdone checks whether all threads have been executed.
Note that although we don’t want users to see our implementation, we do want to tell users the usage specifications of our library, which are the first two typedef. He tells the user the object type of our thread pool and the specifications of the incoming tasks.
Since we support C++ using our library, we add #ifdef __cplusplus extern "C". That is to say, if it is C++, then we will use C rules to compile this file.

Data structure interpretation

Queue

typedef struct task_s
{<!-- -->
    void *next;
    handler_pt func;//corresponding function
    void *arg;//parameters
} task_t;

typedef struct task_queue_s
{<!-- -->
    void *head;//Head pointer
    void **tail;//tail pointer
    int block;//flag
    spinlock_t lock;//spin lock
    pthread_mutex_t mutex;//mutex lock
    pthread_cond_t cond; //Condition
} task_queue_t;

Code interpretation:
Here we first look at the schematic diagram of our queue structure:

We use a chain structure. Connect all tasks together, and then have a manager to manage these tasks. After looking at this picture and the above code, you can clearly understand the structure of our queue.

Pool

typedef struct thrdpool_s
{<!-- -->
    task_queue_t *task_queue;//task queue
    atomic_int quit;//flag
    uint32_t thrd_count;//The number of threads in the pool
    pthread_t *threads;//thread array
} thrdpool_t;

Code interpretation:
There’s not much to say here. The meaning of the flag is: if it is 0, it runs normally, if it is 1, it blocks. It is an atomic variable, so it is thread-safe. Since there is no complicated operation designed for this variable, there is no need to use a lock. We can solve it with atomic variables.

Code implementation

Initialize queue

static task_queue_t* __taskqueue_create()
{<!-- -->
    int ret;
    task_queue_t *queue = (task_queue_t*)malloc(sizeof(*queue));
    if(queue)
    {<!-- -->
        ret = pthread_mutex_init( & amp;queue->mutex);
        if(ret == 0)
        {<!-- -->
            ret = pthread_cond_init( & amp;queue->cond);
            if(ret == 0)
            {<!-- -->
                spinlock_init( & amp;queue->lock);
                queue->head = NULL;
                queue->tail = & amp;queue->head;
                queue->block = 1;
                return queue;
            }
            pthread_mutex_destroy( & amp;queue->mutex);
        }
        free(queue);
    }

    return NULL;
}

Code interpretation:
There is nothing special to say. Here we just apply for a piece of memory and then initialize the structure members in it. If all initialization is successful, the pointer is returned. The main thing to note is that our block should be initialized to 0, which means it is in the blocking state by default. Because it has just been initialized, there are no tasks in the queue.

Remove blocking

static int __nonblock(task_queue_t *queue)
{<!-- -->
    pthread_mutex_lock( & amp;queue->mutex);
    queue->block = 0;
    pthread_mutex_unlock( & amp;queue->mutex);
    pthread_cond_broadcast( & amp;queue->cond);
}

Code interpretation:
When operating on task_queue internal variables, locks must be used to ensure thread safety. Set the block marked as blocked to 0 to unblock it. Then broadcast, waking up other threads.

Insert task

static inline __add_task(task_queue_t *queue, void *task)
{<!-- -->
    void **link = (void**)task;
    *link = NULL;

    spinlock_lock( & amp;queue->lock);
    *queue->tail = link;
    queue->tail = link;
    spinlock_unlock( & amp;queue->lock);
    pthread_cond_signal( & amp;queue->cond);
}

Code interpretation:

First, we let task->next point to NULL. Then, in the case of locking, tail insert the queue. Finally wake up a thread to get the task. Here we need to explain our writing method and why we can write it this way. In fact, in the kernel, queues are written in this way: first-level pointer head + second-level pointer tail. Please see the picture for the principle:

From the perspective of correcting the management length, void* manages the entire node, but **void manages the last 8 bytes pointing to the memory (64-bit operating system) That is *next;
So *queue->tail is queue->tail->next; similarly, *link is *next.

Delete tasks

static task_t* __pop_task(task_queue_t *queue)
{<!-- -->
    spinlock_lock( & amp;queue->lock);
    if(queue->head == NULL)
        return NULL;
    task_t *task;
   
    void **link = (void**)queue->head;
    queue->head = *link;

    if(queue->head == NULL)
        queue->tail = & amp;queue->head;

    spinlock_unlock( & amp;queue->lock);
}

Code explanation:
We take out the frontmost task, and then the head pointer points to the next one.

Get tasks

static void* __get_task(task_queue_t *queue)
{<!-- -->
    task_t *task;
    pthread_mutex_lock( & amp;queue->mutex);
    while(task = __pop_task(queue) == NULL)
    {<!-- -->
        if(queue->block == 0)
        {<!-- -->
            pthread_mutex_unlock( & amp;queue->mutex);
            return NULL;
        }

        pthread_cond_wait( & amp;queue->cond, & amp;queue->mutex);
        pthread_mutex_unlock( & amp;queue->mutex);
    }

    return task;
}

Code interpretation:
We call the __pop_task written above to get the task at the front of the queue. If the queue is non-blocking, return directly. If the queue is blocked (just initialized), then we have to go to sleep and wait for wake-up in __add_task. Then return the obtained task.

Queue destruction

static void __destroy_task_queue(task_queue_t *queue)
{<!-- -->
    task_t *task;
    while(task = __pop_task(queue))
    {<!-- -->
        free(task);
    }

    pthread_mutex_destroy( & amp;queue->mutex);
    pthread_cond_destroy( & amp;queue->cond);
    spinlock_destroy( & amp;queue->lock);

    free(queue);
}

Code interpretation:
Release all resources and destroy them.

Thread pool work

static void* __thrdpool_work(void *arg)
{<!-- -->
    thrdpool_t *thrdpool = (thrdpool_t*)arg;
    task_t *task;
    void *cxt;

    if(atomic_load( & amp;thrdpool->quit) == 0)
    {<!-- -->
        task = (task_t*)__get_task(thrdpool->task_queue);
        handler_pt func = task->func;
        cxt = task->arg;
        free(task);
        func(cxt);
    }

    return NULL;
}

Code interpretation:

Thread stopped

static void __thrdpool_terminer(thrdpool_t *pool)
{<!-- -->
    atomic_store(&pool->quit, 1);
    __nonblock(pool->task_queue);
    int i = 0;
    for(i; i < pool->thrd_count; i + + )
    {<!-- -->
        pthread_join(pool->threads[i], NULL);
    }

}

Code interpretation:
First let the thread pool block, and then complete the tasks of all current threads.

Create a thread pool

static int __thrdpool_create(thrdpool_t *pool, int thrd_num)
{<!-- -->
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init( & amp;attr);

    if(ret == 0)
    {<!-- -->
        pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrd_num);

        if(pool->threads)
        {<!-- -->
            int i = 0;
            for(i; i <thrd_num; i + + )
            {<!-- -->
                if(pthread_create( & amp;pool->threads[i], & amp;attr, __thrdpool_work, pool) != 0)
                {<!-- -->
                    break;
                }
                
            }
            pool->thrd_count = i;
            if(thrd_num == i)
                return 0;
            __thrdpool_terminater(pool);
            free(pool->threads);
        }
        ret = -1;
    }

    return ret;
}

Code interpretation:
The main thing is to open up space on the heap, and then use a loop to create threads in batches. Just pay attention to it step by step. If the resource creation fails, you need to destroy the resource in time and return.

Interface

void thrdpool_treminater(thrdpool_t *pool)
{<!-- -->
    atomic_store( &pool->quit, 1);
    __nonblock(pool->task_queue);
}

thrdpool_t *thrdpool_create(int thrd_count)
{<!-- -->
    thrdpool_t *pool = (thrdpool_t*)malloc(sizeof(thrdpool_t));

    if(pool)
    {<!-- -->
        task_queue_t *task = __taskqueue_create();
        if(task)
        {<!-- -->
            pool->task_queue = task;
            int ret = __thrdpool_create(pool, thrd_count);
            if(ret == 0)
            {<!-- -->
                return pool;
            }
            __destroy_task_queue(pool->task_queue);
        }

        free(pool);
    }

    return NULL;
}

int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg)
{<!-- -->
    task_t *task = (task_t*)malloc(sizeof(task_t));
    if(atomic_load( &pool->quit) == 1)
    {<!-- -->
        return -1;
    }
    task->arg = arg;
    task->func = func;
    __add_task(pool->task_queue, task);
    
    return 0;

}


void thrdpool_waitdone(thrdpool_t *pool)
{<!-- -->
    int i = 0;
    for(i; i < pool->thrd_count; i + + )
    {<!-- -->
        pthread_join(pool->threads[i], NULL);
    }
    __destroy_task_queue(pool->task_queue);
    free(pool->threads);
    free(pool);
}

Code interpretation:
The interface here is provided for customers to use. Here you need to pay attention to the difference between waitdone and terminer. One is to completely stop the thread pool (with destruction function), and the other is not to destroy, but just pause.

This article triggers thinking records through the course of : C/C++ backend server teaching