C language thread depooling interpretation and implementation 01
When talking about concurrency and pooled components, the first thing that comes to mind is definitely the thread pool. So what is the principle of the thread pool and how does it work? This article tells you the answer.
Knowledge sorting
- What is thread pool
A thread pool is a pool component that maintains and manages a certain number of threads. It has the effect of improving CPU work efficiency - Why do you need a thread pool?
In layman’s terms, if we have an IO that is very time-consuming but we are single-threaded, then our thread will block and wait for the IO to complete before continuing. This will be very time consuming. - What are the components of the thread pool?
- task queue
- a certain number of threads
- Lock (guaranteed thread safety)
- The thread pool is used by Ruge to manage threads.
- Have a task: perform a task
- No tasks: CPU sleeps
Header file interpretation
Let’s take a look at the header file first:
#ifndef THRDPOOL_H_ #define THRDPOOL_H_ typedef struct thrdpool_s thrdpool_t; typedef void (*handler_pt)(void*); #ifdef __cplusplus extern "C" {<!-- --> #endif thrdpool_t *thrdpool_create(int thrd_count); void thrdpool_treminater(thrdpool_t *pool); int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg); void thrdpool_waitdone(thrdpool_t *pool); #ifdef __cplusplus } #endif #endif
Code interpretation:
As a component, or a library, we don’t want others to see how we implement it internally, so we just tell users how to use it. thrdpool_create
is used to initialize a thread pool, and the parameter is the number of threads. thrdpool_terminer
is used to stop the thread pool. thrdpool_post
is used to throw a task, that is, in which thread pool, which function is executed, and what are the parameters. thrdpool_waitdone
checks whether all threads have been executed.
Note that although we don’t want users to see our implementation, we do want to tell users the usage specifications of our library, which are the first two typedef
. He tells the user the object type of our thread pool and the specifications of the incoming tasks.
Since we support C++ using our library, we add #ifdef __cplusplus extern "C"
. That is to say, if it is C++, then we will use C rules to compile this file.
Data structure interpretation
Queue
typedef struct task_s {<!-- --> void *next; handler_pt func;//corresponding function void *arg;//parameters } task_t; typedef struct task_queue_s {<!-- --> void *head;//Head pointer void **tail;//tail pointer int block;//flag spinlock_t lock;//spin lock pthread_mutex_t mutex;//mutex lock pthread_cond_t cond; //Condition } task_queue_t;
Code interpretation:
Here we first look at the schematic diagram of our queue structure:
We use a chain structure. Connect all tasks together, and then have a manager to manage these tasks. After looking at this picture and the above code, you can clearly understand the structure of our queue.
Pool
typedef struct thrdpool_s {<!-- --> task_queue_t *task_queue;//task queue atomic_int quit;//flag uint32_t thrd_count;//The number of threads in the pool pthread_t *threads;//thread array } thrdpool_t;
Code interpretation:
There’s not much to say here. The meaning of the flag is: if it is 0, it runs normally, if it is 1, it blocks. It is an atomic variable, so it is thread-safe. Since there is no complicated operation designed for this variable, there is no need to use a lock. We can solve it with atomic variables.
Code implementation
Initialize queue
static task_queue_t* __taskqueue_create() {<!-- --> int ret; task_queue_t *queue = (task_queue_t*)malloc(sizeof(*queue)); if(queue) {<!-- --> ret = pthread_mutex_init( & amp;queue->mutex); if(ret == 0) {<!-- --> ret = pthread_cond_init( & amp;queue->cond); if(ret == 0) {<!-- --> spinlock_init( & amp;queue->lock); queue->head = NULL; queue->tail = & amp;queue->head; queue->block = 1; return queue; } pthread_mutex_destroy( & amp;queue->mutex); } free(queue); } return NULL; }
Code interpretation:
There is nothing special to say. Here we just apply for a piece of memory and then initialize the structure members in it. If all initialization is successful, the pointer is returned. The main thing to note is that our block should be initialized to 0, which means it is in the blocking state by default. Because it has just been initialized, there are no tasks in the queue.
Remove blocking
static int __nonblock(task_queue_t *queue) {<!-- --> pthread_mutex_lock( & amp;queue->mutex); queue->block = 0; pthread_mutex_unlock( & amp;queue->mutex); pthread_cond_broadcast( & amp;queue->cond); }
Code interpretation:
When operating on task_queue
internal variables, locks must be used to ensure thread safety. Set the block
marked as blocked to 0 to unblock it. Then broadcast, waking up other threads.
Insert task
static inline __add_task(task_queue_t *queue, void *task) {<!-- --> void **link = (void**)task; *link = NULL; spinlock_lock( & amp;queue->lock); *queue->tail = link; queue->tail = link; spinlock_unlock( & amp;queue->lock); pthread_cond_signal( & amp;queue->cond); }
Code interpretation:
First, we let task->next
point to NULL. Then, in the case of locking, tail insert the queue. Finally wake up a thread to get the task. Here we need to explain our writing method and why we can write it this way. In fact, in the kernel, queues are written in this way: first-level pointer head + second-level pointer tail. Please see the picture for the principle:
From the perspective of correcting the management length, void*
manages the entire node, but **void
manages the last 8 bytes pointing to the memory (64-bit operating system) That is *next
;
So *queue->tail is queue->tail->next
; similarly, *link is *next
.
Delete tasks
static task_t* __pop_task(task_queue_t *queue) {<!-- --> spinlock_lock( & amp;queue->lock); if(queue->head == NULL) return NULL; task_t *task; void **link = (void**)queue->head; queue->head = *link; if(queue->head == NULL) queue->tail = & amp;queue->head; spinlock_unlock( & amp;queue->lock); }
Code explanation:
We take out the frontmost task, and then the head pointer points to the next one.
Get tasks
static void* __get_task(task_queue_t *queue) {<!-- --> task_t *task; pthread_mutex_lock( & amp;queue->mutex); while(task = __pop_task(queue) == NULL) {<!-- --> if(queue->block == 0) {<!-- --> pthread_mutex_unlock( & amp;queue->mutex); return NULL; } pthread_cond_wait( & amp;queue->cond, & amp;queue->mutex); pthread_mutex_unlock( & amp;queue->mutex); } return task; }
Code interpretation:
We call the __pop_task
written above to get the task at the front of the queue. If the queue is non-blocking, return directly. If the queue is blocked (just initialized), then we have to go to sleep and wait for wake-up in __add_task
. Then return the obtained task.
Queue destruction
static void __destroy_task_queue(task_queue_t *queue) {<!-- --> task_t *task; while(task = __pop_task(queue)) {<!-- --> free(task); } pthread_mutex_destroy( & amp;queue->mutex); pthread_cond_destroy( & amp;queue->cond); spinlock_destroy( & amp;queue->lock); free(queue); }
Code interpretation:
Release all resources and destroy them.
Thread pool work
static void* __thrdpool_work(void *arg) {<!-- --> thrdpool_t *thrdpool = (thrdpool_t*)arg; task_t *task; void *cxt; if(atomic_load( & amp;thrdpool->quit) == 0) {<!-- --> task = (task_t*)__get_task(thrdpool->task_queue); handler_pt func = task->func; cxt = task->arg; free(task); func(cxt); } return NULL; }
Code interpretation:
Thread stopped
static void __thrdpool_terminer(thrdpool_t *pool) {<!-- --> atomic_store(&pool->quit, 1); __nonblock(pool->task_queue); int i = 0; for(i; i < pool->thrd_count; i + + ) {<!-- --> pthread_join(pool->threads[i], NULL); } }
Code interpretation:
First let the thread pool block, and then complete the tasks of all current threads.
Create a thread pool
static int __thrdpool_create(thrdpool_t *pool, int thrd_num) {<!-- --> int ret; pthread_attr_t attr; ret = pthread_attr_init( & amp;attr); if(ret == 0) {<!-- --> pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrd_num); if(pool->threads) {<!-- --> int i = 0; for(i; i <thrd_num; i + + ) {<!-- --> if(pthread_create( & amp;pool->threads[i], & amp;attr, __thrdpool_work, pool) != 0) {<!-- --> break; } } pool->thrd_count = i; if(thrd_num == i) return 0; __thrdpool_terminater(pool); free(pool->threads); } ret = -1; } return ret; }
Code interpretation:
The main thing is to open up space on the heap, and then use a loop to create threads in batches. Just pay attention to it step by step. If the resource creation fails, you need to destroy the resource in time and return.
Interface
void thrdpool_treminater(thrdpool_t *pool) {<!-- --> atomic_store( &pool->quit, 1); __nonblock(pool->task_queue); } thrdpool_t *thrdpool_create(int thrd_count) {<!-- --> thrdpool_t *pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); if(pool) {<!-- --> task_queue_t *task = __taskqueue_create(); if(task) {<!-- --> pool->task_queue = task; int ret = __thrdpool_create(pool, thrd_count); if(ret == 0) {<!-- --> return pool; } __destroy_task_queue(pool->task_queue); } free(pool); } return NULL; } int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg) {<!-- --> task_t *task = (task_t*)malloc(sizeof(task_t)); if(atomic_load( &pool->quit) == 1) {<!-- --> return -1; } task->arg = arg; task->func = func; __add_task(pool->task_queue, task); return 0; } void thrdpool_waitdone(thrdpool_t *pool) {<!-- --> int i = 0; for(i; i < pool->thrd_count; i + + ) {<!-- --> pthread_join(pool->threads[i], NULL); } __destroy_task_queue(pool->task_queue); free(pool->threads); free(pool); }
Code interpretation:
The interface here is provided for customers to use. Here you need to pay attention to the difference between waitdone
and terminer
. One is to completely stop the thread pool (with destruction function), and the other is not to destroy, but just pause.
This article triggers thinking records through the course of