Thread pool concept and implementation
Concept:
In layman’s terms, it is a pool of threads, a set of threads that can complete tasks in a loop
necessity:
We usually create a thread, complete a certain task, and wait for the thread to exit. But when a large number of threads need to be created, suppose T1 is the thread creation time, T2 is the execution time of the thread task, and T3 is the thread destruction time. When T1 + T3 > T2, it is not cost-effective at this time. Using the thread pool can reduce the frequency The overhead caused by creating and destroying threads is very significant when the task processing time is relatively short.
The basic structure of the thread pool:
1 task queue, which stores tasks that need to be processed, and these tasks are processed by worker threads
2 thread pool worker thread, which is the consumer of the task queue task, waiting for the signal of the new task
Implementation of thread pool:
Create the basic structure of the thread pool:
Task Queue Linked List
typedef struct Task;
thread pool structure
typedef struct ThreadPool;
Thread pool initialization:
pool_init() { Create a thread pool structure Implement initialization of task queue mutex and condition variable Create n worker threads }
Add task to thread pool
pool_add_task { Determine whether there are idle worker threads Add a node to the task queue Send signal newtask to worker thread }
Implement worker threads
workThread { while(1){ Wait for newtask task signal Remove node from task queue perform tasks } }
destruction of thread pool
pool_destory { Delete all nodes in the task queue linked list to free up space remove all mutex condition variables Delete the thread pool to free up space }
#include <pthread.h> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #define POOL_NUM 10 //Create the basic structure of the thread pool //The task queue list is a critical resource typedef struct Task { void* (*func)(void* arg);//task function pointer, task style void* arg;//function parameters struct Task* next;//pointer of linked list }Task; // thread pool structure typedef struct ThreadPool { pthread_mutex_t taskLock;//mutex lock for critical resources pthread_cond_t newTask;//Condition variable notifies new task pthread_t tid[POOL_NUM];//thread tid Task* queue_head;//The head of the queue int busywork;//The state of the task }ThreadPool; ThreadPool* pool; // implement worker thread void* workThread(void* arg) { while (1) { //Access critical resources plus thread lock pthread_mutex_lock( &pool->taskLock); //Waiting for tasks, waiting threads must be used with thread locks pthread_cond_wait( &pool->newTask, &pool->taskLock); // get task from queue Task* ptask = pool->queue_head; // point the head of the queue to the next node pool->queue_head = pool->queue_head->next; //unlock pthread_mutex_unlock( & pool->taskLock); //execute task function ptask->func(ptask->arg); //task flag minus one pool->busywork--; } } //actually execute the task function void* realwork(void* arg) { printf("Finish work %d\ ", (int)arg); } // Add tasks to the thread pool void pool_add_task(int arg) { Task* newTask; //Access critical resources plus thread lock pthread_mutex_lock( &pool->taskLock); / / Determine whether the task being executed is full while (pool->busywork >= POOL_NUM) { //Unlock before hibernation pthread_mutex_unlock( & pool->taskLock); usleep(10000); //End sleep and lock pthread_mutex_lock( &pool->taskLock); } pthread_mutex_unlock( & pool->taskLock); //Create a new task newTask = malloc(sizeof(Task)); // task function execution newTask->func = realwork; //Task function pass parameter newTask->arg = arg; // lock pthread_mutex_lock( &pool->taskLock); //Retrieve tasks from the queue Task* member = pool->queue_head; if (member == NULL) { //The new task is given to the head of the queue pool->queue_head = newTask; } else { //find the tail of the linked list while (member->next != NULL) { member = member->next; } // put the new task at the end of the linked list member->next = newTask; } //task ID + 1 pool->busywork + + ; //Send a signal to notify that there is a new task pthread_cond_signal( & pool->newTask); pthread_mutex_unlock( & pool->taskLock); } //Initialization of the thread pool void pool_init() { pool = malloc(sizeof(ThreadPool));//Apply for thread pool space pthread_mutex_init( & amp;pool->taskLock, NULL);//Initialize the parameters in the thread pool pthread_cond_init( & amp;pool->newTask, NULL);//Initialize the parameters in the thread pool pool->queue_head = NULL;//Initialize the parameters in the thread pool pool->busywork = 0;//Initialize the parameters in the thread pool //Create thread for (int i = 0; i < POOL_NUM; i ++ ) { pthread_create( &pool->tid[i], NULL, workThread, NULL); } } //Destroy the thread pool void pool_destory() { Task* head; //delete linked list while (pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } //Destroy the mutex pthread_mutex_destroy( & pool->taskLock); // Destroy the conditional semaphore pthread_cond_destroy( & pool->newTask); //Release thread pool space free(pool); } int main() { // thread pool initialization pool_init(); sleep(20); //Create task for (int i = 1; i <= 20; i ++ ) { pool_add_task(i); } sleep(5); //Destroy the thread pool pool_destory(); }
Compile Error:
error: ‘ThreadPool {aka struct ThreadPool}’ has no member named ‘head’
Significance: The ThreadPool structure does not have a head member.
Solution: Check for typos.
error: too few arguments to function ‘pthread_mutex_init’
Meaning: pthread_mutex_init has fewer parameters
Solution: Check the parameters of the function and add the corresponding parameters
GDB debugging of threads:
When compiling without adding -g, the breakpoint cannot be interrupted by the line number, and gdb debugging will report that no symbol table has been read
gcc -g -o ttest ttest.c -lpthread
show thread
info thread
switch thread
thread id
GDB set breakpoint for specific thread
break location thread id
GDB sets the thread lock,
set scheduler-locking on/off
on: Other threads will be suspended. A single thread can be debugged
#include <pthread.h> #include <stdio.h> // thread implementation void* testThread(void* arg) { // thread name char* threadName = (char*)arg; printf("Current running %s\ ", threadName); printf("aaaaaaaa\ "); printf("bbbbbbbb\ "); //thread exit pthread_exit(0); } int main() { //Define thread structure pthread_t tid1, tid2; //Create thread pthread_create( &tid1, NULL, testThread, "thread1"); pthread_create( &tid2, NULL, testThread, "thread2"); // recycle thread pthread_join(tid1, NULL); pthread_join(tid2, NULL); }