[Linux]Thread synchronization

[Linux]Thread synchronization

Article directory

  • [Linux]Thread synchronization
    • Thread synchronization
      • Thread starvation problem
      • concept
    • Thread synchronization control–condition variables
      • pthread_cond_init function
      • pthread_cond_destroy function
      • pthread_cond_wait function
      • pthread_cond_signal function
      • pthread_cond_broadcast function
      • Examples of using functions related to condition variables
      • producer consumer model
      • Producer-consumer model based on BlockingQueue
    • Thread synchronization control–POSIX semaphore
      • concept
      • sem_init function
      • sem_wait function
      • sem_post function
      • sem_destroy function
      • Production and consumption model based on ring queue

Thread synchronization

Thread starvation problem

Thread starvation refers to the situation where one or more threads are unable to obtain the required resources or execution time slices, and are thus blocked or waiting for a long time and unable to continue execution.

Take a simple example to understand the thread starvation problem: multiple threads complete thread mutual exclusion control by locking, but because the first thread that applied for the lock immediately applied for the lock after releasing the lock, other threads I have been unable to apply for a lock.

Concept

Thread synchronization: Under the premise of ensuring data security, allowing threads to access critical resources in a specific order, thereby effectively avoiding the starvation problem, is called synchronization.

Use the example given in the thread starvation problem mentioned earlier to understand thread synchronization: multiple threads are also mutually exclusive through locking. After the first thread that applies for the lock releases the lock, the next one applies for the lock in sequence. are other threads, thereby completing the solution to the thread starvation problem.

Thread synchronization control – condition variables

Condition variables are the pthread_cond_t data type provided in the native thread library of the Linux operating system. Through the use of condition variables, thread synchronization control can be completed.

The condition variable maintains a circular queue internally. After the thread is handed over to the condition variable, the condition variable can let the thread run in order through its own circular queue structure.

pthread_cond_init function

The pthread_cond_init function is provided under the Linux operating system to initialize condition variables.

//Header file and function declaration where pthread_cond_init is located
#include <pthread.h>

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
  • cond parameter: The condition variable to be initialized.
  • attr parameter: The attribute set for the condition variable, a null pointer is passed in by default.
  • Global condition variables can be initialized using the method pthread_cond_t cond = PTHREAD_COND_INITIALIZER during initialization.

pthread_cond_destroy function

The pthread_cond_destroy function is provided under the Linux operating system to destroy the lock.

//The header file and function declaration where pthread_mutex_destroy is located
#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);
  • The pthread_cond_destroy function is used to destroy local condition variables.
  • cond parameter: The condition variable to be destroyed.

pthread_cond_wait function

//Header file and function declaration where pthread_cond_wait is located
#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
  • The pthread_cond_wait function is used to let the thread wait for the condition variable to be ready. When the thread starts waiting for the condition variable, the thread can continue to run only when the condition variable is ready.
  • cond parameter: The condition variable to use.
  • mutex parameter: The lock to use.
  • The pthread_cond_wait function first releases the incoming lock to facilitate other threads to access the critical section, and then blocks and waits for the condition variable to be ready. After the condition variable is ready, it will apply for the incoming lock again to ensure that the thread Safety.

pthread_cond_signal function

//Header file and function declaration where pthread_cond_signal is located
#include <pthread.h>

int pthread_cond_signal(pthread_cond_t *cond);
  • The pthread_cond_signal function is used to wake up the first thread waiting for the condition variable. This function tells the thread waiting for the condition variable that the condition variable is ready, and the thread will continue to run after it is awakened.
  • cond parameter: The condition variable to use.

pthread_cond_broadcast function

//Header file and function declaration where pthread_cond_broadcast is located
#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cond);
  • The pthread_cond_broadcast function is used to wake up all threads waiting for condition variables. This function tells all threads waiting for the condition variable that the condition variable is ready. After the threads are awakened, they will continue to run in order.
  • cond parameter: The condition variable to use.

Examples of using functions related to condition variables

In order to verify that the condition variable allows the threads to execute in order, write code to verify. The specific code is as follows:

#include <iostream>
#include <unistd.h>
#include <pthread.h>

#define NUM 5

using namespace std;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//Initialization of lock

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//Initialization of condition variables

void *active(void *args)
{<!-- -->
    const char* tname = static_cast<const char*>(args);
    while(true)
    {<!-- -->
        pthread_mutex_lock( & amp;mutex);
        pthread_cond_wait( & amp;cond, & amp;mutex);//The lock will be automatically released within the waiting function
        cout << tname << "active" << endl;
        pthread_mutex_unlock( & amp;mutex);
    }
    return nullptr;
}

int main()
{<!-- -->
    pthread_t tids[NUM];
    for (int i = 0; i < NUM; i + + )//Thread creation
    {<!-- -->
        char* name = new char[64];
        snprintf(name, 64, "thread-%d", i + 1);
        pthread_create(tids + i, nullptr, active, name);
    }

    sleep(2);

    while(true)//Wake up the thread
    {<!-- -->
        pthread_cond_signal( & amp;cond);
        sleep(1);
    }

    for (int i = 0; i < NUM; i + + )//Thread recycling
    {<!-- -->
        pthread_join(tids[i], nullptr);
    }
    return 0;
}

Compile the code, run it and see the results:

cond

In the above verification code, after the thread is locked and enters the critical section, it will first wait for the condition variable to be ready, and wait in the circular queue of the condition variable according to the calling order of the thread. The condition variable will be awakened according to the order of threads in the circular queue. Therefore, threads will run in a certain order.

Producer-consumer model

In order to understand the producer-consumer model, let’s take a real-life example, the producer-consumer model formed with a supermarket as a carrier:

image-20230930173651179

Suppliers, as producers, produce goods and deliver them to supermarkets, and customers, as consumers, consume goods from supermarkets. Suppliers, as producers, produce goods in large quantities and sell them to supermarkets, and customers, as consumers, go to supermarkets to purchase goods, which not only improves production efficiency, but also improves consumption efficiency. Suppliers, as producers, can produce goods in large quantities and deliver them to supermarkets. Customers, as consumers, only need to care about the goods in the supermarket. Therefore, even if consumers do not consume for the time being, suppliers can continue to produce, even if suppliers do not produce goods for the time being. , consumers can continue to consume, which also allows the pace of production and consumption to be inconsistent, and production and consumption can be unevenly busy, so that both production and consumption are not delayed.

Corresponding to the thread concept:

  • A supermarket is a buffer that stores data in some structure.
  • The supplier is the producer thread, which generates some kind of structured data and sends it to the buffer.
  • The customer is the consumer thread, which fetches data from the buffer and processes the data accordingly.

The buffer zone is used as a trading place so that producers and consumers do not have to care about each other, soProducer-consumer model advantages:

  • Decoupling

  • Supports concurrency

  • Supports uneven busyness

The producer-consumer model can also be said to be a kind of thread-thread communication, which is similar to the pipeline used for inter-process communication. The buffer in this model serves as a place for thread communication and must be seen by the producer thread and the consumer thread. Therefore, the buffer is a shared resource. As a shared resource, it must be protected to avoid thread safety issues. Protect the producer-consumer model.

Summary of the characteristics of the producer-consumer model:

  • Three relationships
    • Producer and producer: mutually exclusive relationship (the buffer space is limited and can only store limited data sent by the producer)
    • Producers and consumers: synchronization relationship (When the buffer is full, the consumer should be given priority to consume; when the buffer is empty, the producer should be given priority to produce. If it is not processed in a certain synchronization relationship, production will The producers and consumers will frequently and unnecessarily query the status of the buffer, resulting in low efficiency) and mutually exclusive relationship (consumers and producers cannot access the buffer at the same time)
    • Consumer and consumer: mutually exclusive relationship (the data inside the buffer is limited, and the consumer can only obtain limited data from the buffer)
  • Two roles
    • producer
    • consumer
  • A trading venue
    • buffer

Producer-consumer model based on BlockingQueue

In multi-threaded programming, blocking queue (Blocking Queue) is a data structure commonly used to implement the producer and consumer model. The difference from an ordinary queue is that when the queue is empty, the operation of obtaining elements from the queue will be blocked until an element is placed in the queue; when the queue is full, the operation of storing elements in the queue will also be blocked. , until an element is taken out of the queue (the above operations are based on different threads, and the thread will be blocked when operating on the blocking queue process)

image-20231001164158555

Write code to experience the use of producer-consumer model and condition variables based on BlockingQueue:

blockqueue.hpp: File that implements blocking queue

#include <queue>
#include <pthread.h>

const int gcap = 5;

template <class T>
class blockqueue
{<!-- -->
private:
    bool isFull() {<!-- --> return _q.size() == _cap; }
    bool isEmpty() {<!-- --> return _q.empty(); }

public:
    blockqueue(int cap = gcap) : _cap(cap)
    {<!-- -->
        pthread_mutex_init( & amp;_mutex, nullptr);
        pthread_cond_init( & amp;_consumercond, nullptr);
        pthread_cond_init( & amp;_productorcond, nullptr);
    }
    void push(const T&in)
    {<!-- -->
        pthread_mutex_lock( & amp;_mutex);
        while (isFull()) // The queue is full. The purpose of loop judgment is to avoid concurrency problems caused by multiple producers being awakened by consumers. For example, there is only one space left in the queue, but multiple producer threads are awakened to process data. operate
        {<!-- -->
            pthread_cond_wait( & amp;_productorcond, & amp;_mutex);
        }
        _q.push(in);
        pthread_cond_signal( & amp;_consumercond);
        pthread_mutex_unlock( & amp;_mutex);
    }
    void pop(T *out)
    {<!-- -->
        pthread_mutex_lock( & amp;_mutex);
        while (isEmpty())
        {<!-- -->
            pthread_cond_wait( & amp;_consumercond, & amp;_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal( & amp;_productorcond);
        pthread_mutex_unlock( & amp;_mutex);
    }
    ~blockqueue()
    {<!-- -->
        pthread_mutex_destroy( & amp;_mutex);
        pthread_cond_destroy( & amp;_consumercond);
        pthread_cond_destroy( & amp;_productorcond);
    }

private:
    std::queue<T> _q;
    int _cap; // record capacity
    pthread_mutex_t _mutex; // Control thread mutual exclusion
    pthread_cond_t _consumercond; //Control the consumer thread
    pthread_cond_t _productorcond; //Control the producer thread
};

Task.hpp: Classes that implement data processed by producers and consumers.

#include <cstdlib>
#include <iostream>

class Task
{<!-- -->
public:
    Task()
    {<!-- -->}

    Task(int x, int y, char op): _x(x), _y(y), _op(op), _result(0), _exitcode(0)
    {<!-- -->}

    void operator()()//operate on the incoming data
    {<!-- -->
        switch(_op)
        {<!-- -->
        case ' + ':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
            if (_y == 0) _exitcode = -1;
            else
                _result = _x / _y;
            break;
        case '%':
            if (_y == 0) _exitcode = -2;
            else
                _result = _x % _y;
            break;
        default:
            break;
        }
    }

    const std::string operatorArgs()//Print the data to be processed
    {<!-- -->
        return "(" + std::to_string(_x) + _op + std::to_string(_y) + ")" + "(" + std::to_string(_exitcode) + ")";
    }

    const std::string operatorRes()//Print data processing results
    {<!-- -->
        return std::to_string(_x) + _op + std::to_string(_y) + "=" + std::to_string(_result);
    }

private:
    int _x;//left operand
    int _y;//right operand
    char _op;//Operator
    int _result;//arithmetic result
    int _exitcode;//Exit code
};

main.cc: File that implements the producer-consumer model.

#include "blockqueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <unistd.h>
#include <pthread.h>

void *consumer(void *args)//Consumer method
{<!-- -->
    blockqueue<Task> *bq = static_cast<blockqueue<Task>*>(args);
    while(true)
    {<!-- -->
        Task t;
        bq->pop( & amp;t);//Get data from the blocking queue
        t();//Process the obtained data
        std::cout << "consumer: " << t.operatorRes() << std::endl;
    }
}

void *productor(void* args)//producer method
{<!-- -->
    blockqueue<Task> *bq = static_cast<blockqueue<Task>*>(args);
    std::string ops = " + -*/%";
    while(true)
    {<!-- -->
        int x = rand() % 20;//Generate data
        int y = rand() % 10;
        char op = ops[rand() % ops.size()];
        Task t(x, y, op);
        bq->push(t);//Send data to the blocking queue
        std::cout << "productor: " << t.operatorArgs() << "?" << std::endl;
    }
}

int main()
{<!-- -->
    srand((uint32_t)time(nullptr) ^ getpid());//Set random number
    blockqueue<Task>* q = new blockqueue<Task>;//Create a blocking queue
    pthread_t c[2];
    pthread_t p[3];
    pthread_create( & amp;c[0], nullptr, consumer, q);
    pthread_create( & amp;c[1], nullptr, consumer, q);
    pthread_create( & amp;p[0], nullptr, productor, q);
    pthread_create( & amp;p[1], nullptr, productor, q);
    pthread_create( & amp;p[2], nullptr, productor, q);
    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);
    return 0;
}

Compile the code, run it and see the results:

image-20231001171758937

The overall logic of the above code is that the producer thread generates data and hands it to the blocking queue, the consumer thread obtains the data from the blocking queue and processes it, the producer generates an arithmetic expression, and the consumer performs operations on the expression. All threads share a lock because there is only one queue, and all threads are mutually exclusive. The synchronization of producers and consumers is completed through waiting and waking up of condition variables.

Note: The principle of the producer-consumer model to improve efficiency is that when the producer obtains data, the consumer can obtain the data from the buffer, and when the consumer processes the data, the producer can pass the data in Buffer; one consumer processes data, and other consumers can obtain data from the buffer. A producer obtains data, and other consumers can pass data into the buffer. All threads can execute concurrently, and there will be no serialization. The row-based consumer waits for data to arrive, and the producer waits for the consumer to process the data.

Thread synchronization control – POSIX semaphore

Concept

POSIX semaphores are used for synchronization operations between threads to achieve conflict-free access to shared resources.

POSIX semaphore is the sem_t data type provided in the Linux operating system. Thread synchronization control can be completed by using POSIX semaphore.

The essence of a semaphore is a counter that records the number of critical resources. To access critical resources, threads need to perform P operations (application for resources). After using the critical resources, they need to perform V operations (release resources). The semaphore controls the synchronization of threads through the operations that threads need to apply for when accessing critical resources. . PV operations are atomic.

The mechanism of semaphore control thread synchronization: Convert applying for locks and then judging critical resources into applying for semaphores. Semaphore application operations precede lock application and critical resource operations. When multiple threads want to access the same resource, only the resource that has applied for the semaphore can apply for locks and critical resource operations. Other threads can only Waiting for the semaphore ensures that the threads are executed in order, and will not cause starvation problems of not being able to apply for the lock and not being able to apply for the lock without critical resources.

sem_init function

//The header file and function declaration where the sem_init function is located
#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
  • The sem_init function is used to initialize the semaphore.
  • sem parameter: The semaphore to be initialized.
  • pshared parameter: pshared:0 means sharing between threads, non-zero means sharing between processes.
  • value parameter: The initial value of the semaphore, that is, the number of resources.

sem_wait function

//The header file and function declaration where the sem_wait function is located
#include <semaphore.h>

int sem_wait(sem_t *sem);
  • The sem_wait function is used to wait for the semaphore application, which is the P operation.
  • sem parameter: The semaphore to be applied for.

sem_post function

//The header file and function declaration where the sem_post function is located
#include <semaphore.h>

int sem_post(sem_t *sem);
  • The sem_post function is used to release the semaphore, which is the V operation.
  • sem parameter: The semaphore to be released.

sem_destroy function

//The header file and function declaration where the sem_destroy function is located
#include <semaphore.h>

int sem_destroy(sem_t *sem);
  • The sem_destroy function is used to destroy the semaphore.
  • sem parameter: The semaphore to be destroyed.

Production and consumption model based on ring queue

The producer-consumer model that uses semaphores for synchronization control and uses circular queues as buffers has the following characteristics:

  • Producers are concerned with spatial resources, and consumers are concerned with data resources.
  • As long as the semaphore is not 0, it means that the resource is available and the thread can access the buffer resource.
  • In a circular queue, producers and consumers can operate at the same time as long as they do not access the same location.
  • When the ring queue is empty or full, producers and consumers will access the same area, using semaphores to control threads with operable resources to operate first.

image-20231002152006068

Write code to experience the production and consumption model based on ring queues and the use of semaphores:

ringqueue.hpp:Implements circular queue file:

#include <vector>
#include <semaphore.h>
#include <pthread.h>

using namespace std;

const int N = 5;

template<class T>
class ringqueue
{<!-- -->
private:
    void P(sem_t & amp; sem) {<!-- --> sem_wait( & amp;sem); }//P operation
    void V(sem_t & amp; sem) {<!-- --> sem_post( & amp;sem); }//V operation
    void Lock(pthread_mutex_t & amp; mutex) {<!-- --> pthread_mutex_lock( & amp;mutex); }//Apply for lock operation
    void UnLock(pthread_mutex_t & amp; mutex) {<!-- --> pthread_mutex_unlock( & amp;mutex); }//Release lock operation
public:
    ringqueue(int num = N):_cap(num), _ring(num), _c_step(0), _p_step(0)
    {<!-- -->
        sem_init( & amp;_data_sem, 0, 0);
        sem_init( & amp;_space_sem, 0, num);
        pthread_mutex_init( & amp;_c_mutex, nullptr);
        pthread_mutex_init( & amp;_p_mutex, nullptr);
    }

    void push(const T & in)
    {<!-- -->
        P(_space_sem);//Apply for space semaphore
        Lock(_p_mutex);//Apply for the producer lock
        _ring[_c_step + + ] = in;
        _c_step %= _cap;
        UnLock(_p_mutex);//Release the lock on the producer lock
        V(_data_sem);//Release the data semaphore
    }

    void pop(T* out)
    {<!-- -->
        P(_data_sem);//Apply for data semaphore
        Lock(_c_mutex);//Apply for consumer lock
        *out = _ring[_c_step + + ];
        _c_step %= _cap;
        UnLock(_c_mutex);//Release the lock on the consumer lock
        V(_space_sem);//Release the space semaphore
    }

    ~ringqueue()
    {<!-- -->
        sem_destroy( & amp;_data_sem);
        sem_destroy( & amp;_space_sem);
        pthread_mutex_destroy( & amp;_c_mutex);
        pthread_mutex_destroy( & amp;_p_mutex);
    }
private:
    vector<T> _ring;
    int _cap;//queue capacity
    sem_t _data_sem;//Data semaphore--only consumers care
    sem_t _space_sem;//Space semaphore--only producer relationship
    int _c_step;//Consumer access location
    int _p_step;//Producer access location
    pthread_mutex_t _c_mutex;//Consumer mutual exclusion control lock
    pthread_mutex_t _p_mutex; //Producer mutex control lock
};

Note: Before a producer or consumer enters a critical section, it must first apply for a semaphore and then apply for a lock, otherwise a deadlock may occur. For example, when the buffer is full, if the producer applies for the lock first and then applies for the space semaphore, it will enter the blocking state and wait for the space semaphore. However, because the lock is applied for, the consumer cannot release the space semaphore, causing a deadlock problem. .
Task.hpp: Classes that implement data processed by producers and consumers.

#include <cstdlib>
#include <iostream>
#include <unistd.h>

class Task
{<!-- -->
public:
    Task()
    {<!-- -->}

    Task(int x, int y, char op): _x(x), _y(y), _op(op), _result(0), _exitcode(0)
    {<!-- -->}

    void operator()()//operate on the incoming data
    {<!-- -->
        switch(_op)
        {<!-- -->
        case ' + ':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
            if (_y == 0) _exitcode = -1;
            else
                _result = _x / _y;
            break;
        case '%':
            if (_y == 0) _exitcode = -2;
            else
                _result = _x % _y;
            break;
        default:
            break;
        }
        usleep(100000);
    }

    const std::string operatorArgs()//Print the data to be processed
    {<!-- -->
        return "(" + std::to_string(_x) + _op + std::to_string(_y) + ")" + "(" + std::to_string(_exitcode) + ")";
    }

    const std::string operatorRes()//Print data processing results
    {<!-- -->
        return std::to_string(_x) + _op + std::to_string(_y) + "=" + std::to_string(_result);
    }

private:
    int _x;//left operand
    int _y;//right operand
    char _op;//operator
    int _result;//arithmetic result
    int _exitcode;//Exit code
};

main.cc:File that implements the producer-consumer model.

#include "ringqueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <cstring>

const char* ops = " + -*/%";

void *consumerRoutine(void *args)//Consumer method
{<!-- -->
    ringqueue<Task>* rq = static_cast<ringqueue<Task>*>(args);
    while(true)
    {<!-- -->
        Task t;
        rq->pop( & amp;t);
        t();
        cout << "consumer done, the completed task is: " << t.operatorRes() << endl;
    }
}

void *productorRoutine(void *args)//Producer method
{<!-- -->
    ringqueue<Task>* rq = static_cast<ringqueue<Task>*>(args);
    while(true)
    {<!-- -->
        int x = rand() % 100;
        int y = rand() % 100;
        char op = ops[(x + y) % strlen(ops)];
        Task t(x, y, op);
        rq->push(t);
        cout << "productor done, the production task is: " << t.operatorArgs() << endl;
    }
}

int main()
{<!-- -->
    ringqueue<Task>* rq= new ringqueue<Task>();
    pthread_t c[3], p[2];

    for (int i = 0; i < 3; i + + )
        pthread_create(c + i, nullptr, consumerRoutine, rq);
    for (int i = 0; i < 2; i + + )
        pthread_create(p + i, nullptr, productorRoutine, rq);

    for (int i = 0; i < 3; i + + )
        pthread_join(c[i], nullptr);
    for (int i = 0; i < 2; i + + )
        pthread_join(p[i], nullptr);

    return 0;
}

Compile the code, run it and see the results:

image-20231002191054980

The significance of using semaphores to implement the producer-consumer model: When producing and consuming more, build tasks concurrently before putting data into the queue. After obtaining the data, multiple threads can process the tasks concurrently, because these operations do not Lock it!