< Linux > Multithreading (semaphore)

Directory

1. POSIX semaphore

The principle of semaphore

The concept of semaphore

semaphore function

2. Production and consumption model based on ring queue

Requesting and releasing resources for producers and consumers

Code implementation (single producer and single consumer)

Code implementation (multiple producers and multiple consumers)

1. POSIX semaphore

The principle of semaphore

  • We call resources that may be accessed by multiple execution streams at the same time critical resources. Critical resources need to be protected, otherwise problems such as data inconsistency will occur.
  • When we only use a mutex to protect a critical resource, it means that we regard this critical resource as a whole, and only one execution flow is allowed to access this critical resource at the same time.
  • But in fact, we can divide this critical resource into multiple areas. When multiple execution flows need to access critical resources, if these execution flows access different areas of critical resources, then we can allow these execution flows to access critical resources at the same time. In different areas of the resource, there will be no problems such as data inconsistency at this time.

The concept of semaphore

  • The POSIX semaphore and the SystemV semaphore have the same function, and are used for synchronous operations to achieve conflict-free access to shared resources. But POSIX can be used for inter-thread synchronization.
  • The essence of a semaphore is a counter, which describes the number of resources in critical resources. The semaphore can manage critical resources in a finer granularity. The essence of applying for a semaphore is to reserve some kind of resource.

Each execution flow should apply for a semaphore before entering the critical section. If the application is successful, it will have the permission to operate the critical resource. After the operation is completed, the semaphore should be released:

PV operation of semaphore:

  • P operation: We call the application semaphore P operation (atomic). The essence of applying for a semaphore is to apply for the right to use a certain resource in the critical resource. When the application is successful, the number of resources in the critical resource should be reduced by one, so The essence of the P operation is to decrement the counter by one.
  • V operation: We call the release of the semaphore a V operation (atomic). The essence of releasing the semaphore is to return the right to use a certain resource in the critical resource. When the release is successful, the number of resources in the critical resource should be increased + +, Therefore, the essence of the V operation is to add ++ to the counter.

PV operations must be atomic:

  • Multiple execution flows will compete to apply for semaphores in order to access critical resources, so semaphores will be accessed by multiple execution flows at the same time, which means that semaphores are essentially critical resources.
  • But the essence of semaphore is to protect critical resources. We can no longer use semaphore to protect semaphore, so the PV operation of semaphore must be an atomic operation.
  • Note: The ++ and — operations of variables in memory are not atomic operations, so it is impossible for a semaphore to simply perform ++ and — operations on a global variable.

Failed to apply for semaphore and was suspended waiting:

  • When the execution flow is applying for the semaphore, the value of the semaphore may be 0 at this time, that is to say, all the critical resources described by the semaphore have been applied, and the execution flow should be in the waiting queue of the semaphore at this time. Wait until a semaphore is released before being woken up.
  • Note: The essence of the semaphore is a counter, but it does not mean that there are only counters. The semaphore also includes a waiting queue.

Question 1: If the semaphore application is successful, will it be guaranteed to have some critical resources?

  • As long as the semaphore application is successful, the specified resource will be obtained. Just like applying for a mutex, as long as you get the lock, you can get critical resources without worrying about being switched. This is the resource reservation mechanism. We can set the semaphore to 1, when p– is 0, that is, the process of locking, and when v ++ becomes 1, that is, the process of releasing the lock, we call the above semaphore a binary semaphore .

Question 2:Critical resources can be regarded as a whole, but can they be regarded as a small part?

  • Generally yes, it should be combined with specific application scenarios.

semaphore function

sem_init Initialize the semaphore:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

Parameter Description:

  • sem: The semaphore that needs to be initialized.
  • pshared: Pass in a value of 0 to indicate sharing between threads, and pass in a non-zero value to indicate sharing between processes.
  • value: The initial value of the semaphore (the initial value of the counter).

Return value description:

  • Initialization of the semaphore returns 0 successfully, and returns -1 on failure.

Note: The POSIX semaphore and the System V semaphore have the same function, both are used for synchronization operations to achieve the purpose of accessing shared resources without conflicts, but the POSIX semaphore can be used for inter-thread synchronization.

sem_destroy destroys the semaphore:

int sem_destroy(sem_t *sem);

Parameter Description:

  • sem: The semaphore that needs to be destroyed.

Return value description:

  • If the semaphore is destroyed successfully, it returns 0, and if it fails, it returns -1.

sem_wait wait for semaphore (apply for semaphore):

int sem_wait(sem_t *sem);

Parameter Description:

  • sem: The semaphore to wait for.

Return value description:

  • Wait for the semaphore to return 0 successfully, and the value of the semaphore is decremented by one.
  • Waiting for the semaphore to fail returns -1, and the value of the semaphore remains unchanged.

sem_post post semaphore (release semaphore):

int sem_post(sem_t *sem);

Parameter Description:

  • sem: The semaphore that needs to be released.

Return value description:

  • Publishing the semaphore successfully returns 0, and the value of the semaphore is incremented by ++.
  • If the release of the semaphore fails, -1 is returned, and the value of the semaphore remains unchanged.

2. Production and consumption model based on ring queue

The producer-consumer example in the previous blog post is based on queue, and its space can be dynamically allocated. Now this program is rewritten based on a fixed-size ring queue. The ring queue adopts array simulation (array simulation is recommended because the cache hit rate is high), and modulo operation is used to simulate the ring feature. In the producer-consumer model of this ring queue, the production thread is used to put data, and the consumer thread is used to get data.

We can let the producer take the data from the tail at the same time when the producer is producing data. As long as the two do not access the same location, this can be executed concurrently, but they may access the same location. In the past, when we were learning circular queues, we needed to judge empty and full.

  • By default, when the ring queue is empty, the producer and consumer point to the same location, and then the producer produces one, and the consumer consumes one later. Our previous plan was to reserve an empty location for use. When it is empty, the two point to the same position, and when it is full, it means that the next position of the current position is the end and is full. Or use a counter solution.

We don’t need to consider whether the ring queue is full or empty now, because there are semaphores to help us consider it. When we don’t consider leaving an empty slot, it happens that two execution streams access the same slot only when the ring queue is full or empty (mutual exclusion & amp; synchronization). Other times we all point to different locations.

  • When it is empty, the consumer cannot consume, but the producer must produce. When it is full, the producer cannot produce, but the consumer must consume. This is a manifestation of mutual exclusion.
  • We cannot allow both to access a location and execute them at the same time. They must be written in a certain order, which is the embodiment of synchronization.
  • When the two point to different locations, the two can run at the same time, which is the embodiment of concurrency.

If the two execution flows of the producer and the consumer point to the same location, which one of them will run first? To solve this problem, you need to discuss by category (empty & amp; full), the answer is as follows:

  • Empty: the consumer cannot exceed the producer, because the consumer cannot read data that has not been produced –> the producer runs first
  • Full: The producer cannot put the consumer in a circle and continue to write later, because this will overwrite the data you have produced but not yet received by the consumer –> The consumer runs first

Note: The above principles are guaranteed by semaphores. A semaphore is a counter used to describe the number of critical resources. We guarantee the serialization process of producers and consumers through semaphores. Except for these two cases, the producer and the consumer do not point to the same location, so there is no possibility of data inconsistency in the ring queue. And in most cases, the producers and consumers do not point to the same location, so in most cases, the ring queue can allow producers and consumers to execute concurrently.

Apply and release resources for producers and consumers

For producers and consumers, the resources they focus on are different: (we assume that the space of the ring queue is N)

  • The producer is concerned about whether there is room in the ring queue, as long as there is room, the producer can produce. Its spatial variation should be N -> 0
  • Consumers are concerned about whether there is data (data) in the ring queue, as long as there is data consumers can consume. Its data change is 0 -> N

Now we use semaphores to describe the space resources (roomSem) and data resources (dataSem) in the ring queue. The initial values set for them are different when we initialize the semaphore:

  • The initial value of roomSem should be set to the capacity of the ring queue, because the wake-up queue is full of space at the beginning
  • The initial value of dataSem should be set to 0, because there is no data in the ring queue at the beginning

Producers apply for space resources and release data resources:

For producers, the space resource rootSem must be applied for each time before data production

  • If the value of rootSem is not 0, the data is not full, and there is still space for data, the semaphore application is successful. At this time, the producer can perform production operations and put data
  • If the value of rootSem is 0, the data is full, and there is no space for data, the semaphore application fails. At this time, the producer needs to block and wait in the waiting queue of rootSem until there is new space in the ring queue before being awakened ( when the consumer consumes the data).

When the producer finishes producing the data, the data resource dataSem should be released

  • Although the producer performs P operation on rootSem before production, but when the producer finishes producing data, it should perform V operation on dataSem instead of rootSem. Because after the data is produced, there will be one more data resource, so the V operation is performed on dataSem.
  • The producer applies for the room location before producing data. After the producer finishes producing the data, the location stores the data produced by the producer. Before the data is consumed by the consumer, the location is no longer the room location. Instead, it should be the data location.
  • When the producer finishes producing data, it means that there is an additional data position in the ring queue, so we should perform V operation on dataSem.

Consumers apply for data resources and release space resources:

For consumers, the data resource dataSem must be applied for each time before consumption

  • If the value of dataSem is not 0, there is data, and the data can be consumed, the semaphore application is successful, and the consumer can perform the consumption operation at this time.
  • If the value of dataSem is 0, there is no data, and the data cannot be consumed, the semaphore application fails. At this time, the consumer needs to block and wait in the waiting queue of dataSem until there is new data in the ring queue before being woken up (when the production after production data).

When the consumer finishes consuming the data, rootSem should be released:

  • Although consumers perform P operations on dataSem before consumption, when consumers consume data, they should perform V operations on rootSem instead of dataSem. Because after consuming the data, there will be one more space, so perform V operation on rootSem
  • Consumers apply for the data position before consuming data. After the consumer consumes the data, the data in this position has already been consumed, and it is meaningless to be consumed again. In order for the producer to subsequently produce in this position For new data, we should count this location as the room location, not the data location.
  • When the consumer consumes the data, it means that there is an additional room position in the ring queue, so we should perform V operation on rootSem.

Code implementation (single producer, single consumer)

RingQueue.hpp file:

  • The above-mentioned ring queue RingQueue is the trading place in the producer-consumer model, and we can use the vector in the C++ SEL library to implement it. We design RingQueue as a template class.

The private member variables of RingQueue are as follows:

  • ringqueue_: This ring queue is implemented with an array, so we define a vector array ringqueue_
  • roomSem_ & amp; dataSem_: respectively used for the semaphore of the producer to measure the space counter and the semaphore of the consumer to measure the data counter
  • pIndex_ & amp; cIndex_: the subscripts, respectively record the position written by the current producer and the position read by the consumer

The public member functions of RingQueue are as follows:

RingQueue constructor:

  1. Initialize the capacity of the array ringqueue_ to 5 (define the global variable gCap), and initialize the subscript of the consumer producer to 0
  2. Multiplex sem_init to initialize roomSem and dataSem

~RingQueue destructor:

  1. Multiplexing sem_destroy to release space semaphore and data semaphore

push production function:

  1. Reuse the sem_wait function to apply for roomSem space resources
  2. Then put the produced data into the ringqueue_ array, update pIndex_ + +, in order to prevent pIndex_ from going out of bounds, % the size of the array ringqueue_ size()
  3. After creating the data, reuse the sem_post function to perform V operation on the dataSem semaphore, because after the data is produced, the data semaphore needs to + +

pop consumption function:

  1. Reuse the sem_wait function to apply for dataSem data resources
  2. Define a temporary variable to save the data to be taken out, and there is no need to release this data after taking it out, because the producer’s production data will overwrite this location in the future
  3. After the data is taken away, there is one more space, and the sem_post function is reused to perform the V operation on the roomSem semaphore
  4. After taking away the data, update the position of cIndex. In order to prevent cIndex_ from going out of bounds, % the size of the array ringqueue_ size()
  5. Return the data taken by temp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
using namespace std;

// Define the default capacity size
const int gCap = 5;

template <class T>
class RingQueue
{
public:
    // Constructor
    RingQueue(int cap = gCap)
        : ringqueue_(cap), pIndex_(0), cIndex_(0)
    {
        // Initialize space counter semaphore
        sem_init( & roomSem_, 0, ringqueue_.size());
        // Initialize the data counter semaphore
        sem_init( & amp; dataSem_, 0, 0);
    }
    // destructor
    ~RingQueue()
    {
        sem_destroy( & amp; roomSem_);
        sem_destroy( & amp; dataSem_);
    }
    // Production
    void push(const T &in)
    {
        sem_wait( & amp;roomSem_); // Apply for space resource semaphore
        ringqueue_[pIndex_] = in; // put the production data into the array
        sem_post( & amp;dataSem_); // V operation
        pIndex_ + + ; // Update the written position
        pIndex_ %= ringqueue_.size(); // Ensure the validity of pIndex_
    }
    // Consumption
    T pop()
    {
        sem_wait( & amp;dataSem_); // Apply for data resource semaphore
        T temp = ringqueue_[cIndex_]; // save the fetched data
        sem_post( & amp; roomSem_); // V operation
        cIndex_ + + ; // Update the read position
        cIndex_ %= ringqueue_.size(); // Ensure the validity of cIndex_
        return temp;
    }

private:
    vector<T> ringqueue_; // ring queue
    sem_t roomSem_; // measure room counter, producer
    sem_t dataSem_; // measurement data counter, consumer
    uint32_t pIndex_; // current producer write position
    uint32_t cIndex_; // The position read by the current consumer
};

RingQueueTest.cc file:

  • The circular queue needs to allow producers to continuously produce data, and consumers to continuously consume data. We use the rand function to generate a random number, and let the producer push this data into the ring queue, while the consumer is constantly popping data from the ring queue. For the convenience of observation, we can print out the data produced by the producer and the data consumed by the consumer.
#include "RingQueue.hpp"
#include <unistd.h>
#include <ctime>
// The producer keeps producing data
void *productor(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rand() % 10;
        rqp->push(data);
        cout << "pthread[" << pthread_self() << "]" << " produced a data: " << data << endl;
        sleep(1);
    }
}
// Consumers continue to consume data
void *consumer(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rqp->pop();
        cout << "pthread[" << pthread_self() << "]" << " consumes a data: " << data << endl;
    }
}
int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    RingQueue<int> rq;
    pthread_t c, p;
    pthread_create( &p, nullptr, producer, &rq);
    pthread_create( &c, nullptr, consumer, &rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    return 0;
}

Test results:

Producer-consumer in sync

  • We control the producer to produce a piece of data every 1s, and the consumer to consume a piece of data every 1s. Here we can see that the producer and the consumer are in sync:

Producers produce fast, consumers consume slowly

  • We control consumers to consume every 1s, and the producer continues to produce data. It can be seen that the producer produces 5 pieces of data very quickly, causing the space to be full, blocking and waiting, and then the consumer consumes one piece of data. In the future, every time a consumer consumes a piece of data, a producer produces one.

Producers produce slowly, consumers consume quickly

  • We control producers to produce data every 1s, and consumers to consume data continuously. It can be observed that there is no phenomenon within 1s, because the producer has not yet produced data, and after 1s, a piece of data is produced, and the consumer consumes a piece of data. In the future, it becomes every time one is produced, and then one is consumed:

Code implementation (multiple producers and consumers)

We implemented the above code with a single producer and a single consumer. Now we will implement a multi-producer and multi-consumer. For multi-producers and multi-consumers, the principle of mutual exclusion between producers and producers, mutual exclusion between consumers and mutual + synchronization between producers and consumers should be followed. The logic changes as follows:

Lock:

  • For the push production function and pop consumption function, the entered function must apply for semaphore resources, and multiple producers and multiple consumers will cause multiple threads to apply for signal resources at the same time to perform P operations. P operations are atomic, followed by multiple The thread puts data at the position of pIndex_ and takes data at the position of cIndex_. At this time, pIndex_ and cIndex_ become critical resources. When accessing the critical area later, it is necessary to maintain the relationship between the producer and the producer, the consumer and the producer. Mutual exclusivity between consumers. At most one thread is allowed to write to this critical section. So lock this critical section.

Push and pop locked positions:

  • Applying for semaphores is a reservation mechanism for resources, so all threads can apply for semaphores first. It is best to put the lock behind the application semaphore. As for which thread applies for the lock, it is determined by their internal competition. Doing this is as if they all got tickets (apply for semaphore), and come in one by one (apply for lock, access critical section), this locking process is also the same for the pop function.

RingQueue.hpp file:

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
using namespace std;

// Define the default capacity size
const int gCap = 5;

template <class T>
class RingQueue
{
public:
    // Constructor
    RingQueue(int cap = gCap)
        : ringqueue_(cap), pIndex_(0), cIndex_(0)
    {
        // Initialize space resource and data resource semaphore
        sem_init( & roomSem_, 0, ringqueue_.size());
        sem_init( & amp; dataSem_, 0, 0);

        // Initialize producer and consumer locks
        pthread_mutex_init( &pmutex_, nullptr);
        pthread_mutex_init( & cmutex_, nullptr);
    }
    // destructor
    ~RingQueue()
    {
        // Release the semaphore
        sem_destroy( & amp; roomSem_);
        sem_destroy( & amp; dataSem_);
        // release the lock
        pthread_mutex_destroy( &pmutex_);
        pthread_mutex_destroy( & cmutex_);
    }
    // Production
    void push(const T &in)
    {
        sem_wait( & amp;roomSem_); // Apply for space resource semaphore
        pthread_mutex_lock( &pmutex_);

        ringqueue_[pIndex_] = in; // put the production data into the array
        pIndex_ + + ; // Update the written position
        pIndex_ %= ringqueue_.size(); // Ensure the validity of pIndex_

        pthread_mutex_unlock( &pmutex_);
        sem_post( & amp;dataSem_); // V operation
    }
    // Consumption
    T pop()
    {
        sem_wait( & amp;dataSem_); // Apply for data resource semaphore
        pthread_mutex_lock( & cmutex_);

        T temp = ringqueue_[cIndex_]; // save the fetched data
        cIndex_ + + ; // Update the read position
        cIndex_ %= ringqueue_.size(); // Ensure the validity of cIndex_
        
        pthread_mutex_unlock( & cmutex_);
        sem_post( & amp; roomSem_); // V operation
        return temp;
    }

private:
    vector<T> ringqueue_; // ring queue
    sem_t roomSem_; // measure room counter, producer
    sem_t dataSem_; // measurement data counter, consumer
    uint32_t pIndex_; // The position written by the current producer, if it is multi-threaded, pIndex_ is also a critical resource
    uint32_t cIndex_; // The position read by the current consumer, if it is multi-threaded, cIndex_ is also a critical resource

    pthread_mutex_t pmutex_; // Producer's lock
    pthread_mutex_t cmutex_; // consumer's lock
};

RingQueueTest.cc file:

#include "RingQueue.hpp"
#include <unistd.h>
#include <ctime>
// The producer keeps producing data
void *productor(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rand() % 10;
        rqp->push(data);
        cout << "pthread[" << pthread_self() << "]" << " produced a data: " << data << endl;
        sleep(1);
    }
}
// Consumers continue to consume data
void *consumer(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rqp->pop();
        cout << "pthread[" << pthread_self() << "]" << " consumes a data: " << data << endl;
    }
}
int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    RingQueue<int> rq;
    pthread_t c1, c2, c3, p1, p2, p3;
    pthread_create( &p1, nullptr, producer, &rq);
    pthread_create( &p2, nullptr, producer, &rq);
    pthread_create( &p3, nullptr, producer, &rq);
    pthread_create( &c1, nullptr, consumer, &rq);
    pthread_create( &c2, nullptr, consumer, &rq);
    pthread_create( &c3, nullptr, consumer, &rq);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    return 0;
}

The above is a multi-producer and multi-consumer model based on ring queues.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. CS introductory skill tree Introduction to LinuxFirst acquaintance with Linux28448 People are learning systematically