Process, thread and interprocess communication – thread pool concept and implementation

Thread pool concept and implementation

Concept:

In layman’s terms, it is a pool of threads, a set of threads that can complete tasks in a loop

necessity:

We usually create a thread, complete a certain task, and wait for the thread to exit. But when a large number of threads need to be created, suppose T1 is the thread creation time, T2 is the execution time of the thread task, and T3 is the thread destruction time. When T1 + T3 > T2, it is not cost-effective at this time. Using the thread pool can reduce the frequency The overhead caused by creating and destroying threads is very significant when the task processing time is relatively short.

The basic structure of the thread pool:

1 task queue, which stores tasks that need to be processed, and these tasks are processed by worker threads

2 thread pool worker thread, which is the consumer of the task queue task, waiting for the signal of the new task

Implementation of thread pool:

Create the basic structure of the thread pool:

Task Queue Linked List

typedef struct Task;

thread pool structure

typedef struct ThreadPool;

Thread pool initialization:

pool_init()

{

Create a thread pool structure

Implement initialization of task queue mutex and condition variable

Create n worker threads

}

Add task to thread pool

 pool_add_task

{

    Determine whether there are idle worker threads

Add a node to the task queue

    Send signal newtask to worker thread

}

Implement worker threads

 workThread

{
    
while(1){

   Wait for newtask task signal

   Remove node from task queue

   perform tasks

}

}

destruction of thread pool

 pool_destory

{

Delete all nodes in the task queue linked list to free up space

remove all mutex condition variables

Delete the thread pool to free up space

}

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>

#define POOL_NUM 10
//Create the basic structure of the thread pool
//The task queue list is a critical resource
typedef struct Task
{
    void* (*func)(void* arg);//task function pointer, task style
    void* arg;//function parameters
    struct Task* next;//pointer of linked list
}Task;
// thread pool structure
typedef struct ThreadPool
{
    pthread_mutex_t taskLock;//mutex lock for critical resources
    pthread_cond_t newTask;//Condition variable notifies new task

    pthread_t tid[POOL_NUM];//thread tid
    Task* queue_head;//The head of the queue
    int busywork;//The state of the task

}ThreadPool;

ThreadPool* pool;
// implement worker thread
void* workThread(void* arg)
{
    while (1)
    {
        //Access critical resources plus thread lock
        pthread_mutex_lock( &pool->taskLock);
        //Waiting for tasks, waiting threads must be used with thread locks
        pthread_cond_wait( &pool->newTask, &pool->taskLock);
        // get task from queue
        Task* ptask = pool->queue_head;
        // point the head of the queue to the next node
        pool->queue_head = pool->queue_head->next;
        //unlock
        pthread_mutex_unlock( & pool->taskLock);
        //execute task function
        ptask->func(ptask->arg);
        //task flag minus one
        pool->busywork--;


    }


}
//actually execute the task function
void* realwork(void* arg)
{
    printf("Finish work %d\
", (int)arg);

}
// Add tasks to the thread pool
void pool_add_task(int arg)
{
    Task* newTask;
    //Access critical resources plus thread lock
    pthread_mutex_lock( &pool->taskLock);
    / / Determine whether the task being executed is full
    while (pool->busywork >= POOL_NUM)
    {
        //Unlock before hibernation
        pthread_mutex_unlock( & pool->taskLock);
        usleep(10000);
        //End sleep and lock
        pthread_mutex_lock( &pool->taskLock);
    }
    pthread_mutex_unlock( & pool->taskLock);

    //Create a new task
    newTask = malloc(sizeof(Task));
    // task function execution
    newTask->func = realwork;
    //Task function pass parameter
    newTask->arg = arg;

    // lock
    pthread_mutex_lock( &pool->taskLock);
    //Retrieve tasks from the queue
    Task* member = pool->queue_head;

    if (member == NULL)
    {
        //The new task is given to the head of the queue
        pool->queue_head = newTask;
    }
    else
    {
        //find the tail of the linked list
        while (member->next != NULL)
        {
            member = member->next;
        }
        // put the new task at the end of the linked list
        member->next = newTask;

    }
    //task ID + 1
    pool->busywork + + ;
    //Send a signal to notify that there is a new task
    pthread_cond_signal( & pool->newTask);

    pthread_mutex_unlock( & pool->taskLock);


}

//Initialization of the thread pool
void pool_init()
{
    pool = malloc(sizeof(ThreadPool));//Apply for thread pool space
    pthread_mutex_init( & amp;pool->taskLock, NULL);//Initialize the parameters in the thread pool
    pthread_cond_init( & amp;pool->newTask, NULL);//Initialize the parameters in the thread pool
    pool->queue_head = NULL;//Initialize the parameters in the thread pool
    pool->busywork = 0;//Initialize the parameters in the thread pool
    //Create thread
    for (int i = 0; i < POOL_NUM; i ++ )
    {
        pthread_create( &pool->tid[i], NULL, workThread, NULL);
    }
}
//Destroy the thread pool
void pool_destory()
{
    Task* head;
    //delete linked list
    while (pool->queue_head != NULL)
    {
        head = pool->queue_head;
        pool->queue_head = pool->queue_head->next;
        free(head);
    }
    //Destroy the mutex
    pthread_mutex_destroy( & pool->taskLock);
    // Destroy the conditional semaphore
    pthread_cond_destroy( & pool->newTask);
    //Release thread pool space
    free(pool);

}
int main()
{
    // thread pool initialization
    pool_init();
    sleep(20);
    //Create task
    for (int i = 1; i <= 20; i ++ )
    {
        pool_add_task(i);

    }

    sleep(5);
    //Destroy the thread pool
    pool_destory();

}

Compile Error:

error: ‘ThreadPool {aka struct ThreadPool}’ has no member named ‘head’

Significance: The ThreadPool structure does not have a head member.

Solution: Check for typos.

error: too few arguments to function ‘pthread_mutex_init’

Meaning: pthread_mutex_init has fewer parameters

Solution: Check the parameters of the function and add the corresponding parameters

GDB debugging of threads:

When compiling without adding -g, the breakpoint cannot be interrupted by the line number, and gdb debugging will report that no symbol table has been read

gcc -g -o ttest ttest.c -lpthread

show thread

info thread

switch thread

thread id

GDB set breakpoint for specific thread

break location thread id

GDB sets the thread lock,

set scheduler-locking on/off

on: Other threads will be suspended. A single thread can be debugged

#include <pthread.h>
#include <stdio.h>

// thread implementation
void* testThread(void* arg)
{
    // thread name
    char* threadName = (char*)arg;

    printf("Current running %s\
", threadName);

    printf("aaaaaaaa\
");
    printf("bbbbbbbb\
");
    //thread exit
    pthread_exit(0);

}


int main()
{
    //Define thread structure
    pthread_t tid1, tid2;
    //Create thread
    pthread_create( &tid1, NULL, testThread, "thread1");
    pthread_create( &tid2, NULL, testThread, "thread2");
    // recycle thread
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);


}