[Common c/c++] Producer-consumer model using mutex/cv/semaphore

Foreword:

The producer-consumer model is a commonplace topic, and there are various implementation methods. The operating efficiency of different methods is also very different. It is difficult to balance factors such as code simplicity, data security, operational stability, and operational performance.

Basic model -> Large-granularity lock + busy waiting (high efficiency, high CPU)

Component:

mutex

Overview:

Advantages: The code is concise and easy to understand, easy to read and modify, and the logic is clear.

shortcoming:

1) The CPU and operating efficiency cannot have both, or the CPU is busy (this is often absolutely unacceptable);

2) Either the operating efficiency cannot be guaranteed (if the sleep interval is long, the efficiency will be low, if it is short, the CPU will be busy);

3) The locking granularity of the competing data is large, and the entire list is locked at once. However, this is not a big problem, and it is difficult to optimize. It generally falls into the category of lock-free programming. Not a serious shortcoming.

4) When there are multiple consumers, multiple consumers will block each other due to locking issues.

Code:

#include <thread>
#include <mutex>
#include <list>
#include <unistd.h>
#include <stdio.h>

std::list<long> FIFO;
std::mutex lock;
long consumer_v = -1;
long producer_v = 9999999;

void consumer(){
static long times=0;
while(consumer_v!=0){
std::unique_lock<std::mutex> ul(lock);
if(!FIFO.empty()){
consumer_v =std::move(FIFO.front());
FIFO.pop_front();
times + + ;
}else{
            //usleep(1); //Reduce the number of polls to save CPU
times + + ;
}
}
printf("consumer times : %ld\\
" , times);
}

void producer(){
static long times=0;
while(producer_v--!=0){
std::unique_lock<std::mutex> ul(lock);
FIFO.push_back(producer_v);
times + + ;
}
printf("producer times : %ld\\
" , times);
}

int main()
{
std::thread cons(consumer);
std::thread prod(producer);
cons.join();
prod.join();
}

In the above code, the CPU usually reaches 200%. The reason is that the consumer needs to determine whether there is data in the FIFO. If there is no data, it needs to be locked and determined again. Therefore, the data has a busy check code structure. This process will consume a lot of CPU.

View via top command:

%CPU
200.0

You can use usleep to reduce the polling frequency and thereby reduce the CPU, but the disadvantage is that the execution time of the code will become longer.

$ time ./1
producer times: 9999999
consumer times: 10002394

real 0m16.661s
user 0m19.614s
sys 0m13.541s

Every time you run the above code, you will find that the value of consumer times will fluctuate greatly in the output results. Sometimes it is several hundred times larger than producer times, and sometimes it is thousands larger. These are the number of useless polls.

Improve the basic model of CPU -> Large-granularity lock + multiple consumers-multiple producers (low efficiency, low CPU)

Component:

semaphore

Overview:

In addition to the shortcomings of the previous model, there is also a problem that only one consumer can actually jump out of the blocking state at the same time. Multiple consumers will block each other on the mutex lock. If the shared resource can be accessed through a read-write lock , then this is not a good implementation.

It should be noted that in the producer-consumer model, semaphore only plays the role of notifier, and the protection of shared resources still needs to use mutex-like components. Although mutex is a dualization of semaphore, it is this dualization that enables exclusive access.

Code 1:

#include <thread>
#include <mutex>
#include <list>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>

#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <semaphore.h>

const char* sempname = "test";
#define FLAG O_CREAT
#define DEFAULT_SEMP_CNT 0




std::list<long> FIFO;
std::mutex lock;
sem_t* sem;
long consumer_v = -1;
long producer_v = 99999;

void clearsemp(sem_t* sem)
{
  int value=0;
  do{
    sem_trywait(sem);
    sem_getvalue(sem, & amp;value);
  }while(value!=DEFAULT_SEMP_CNT);
}


void consumer(){
        static long times=0;
        while(consumer_v!=0){
                sem_wait(sem);
                std::unique_lock<std::mutex> lk(lock);
                consumer_v = std::move(FIFO.front());
                FIFO.pop_front();
                lk.unlock();
                times + + ;
                printf("[%d]consumer times : %ld\\
" ,gettid(), times);
        }
}

void producer(){
        static long times=0;
        while(producer_v--!=0){
                std::unique_lock<std::mutex> lk(lock);
                FIFO.push_back(producer_v);
                sem_post(sem);
                lk.unlock();
                times + + ;
                printf("producer times : %ld\\
" , times);
        }
}

int main()
{
        umask(0);
        sem = sem_open(sempname,FLAG,0777,DEFAULT_SEMP_CNT);
        if(SEM_FAILED==sem){
          return 0;
        }
        clearsemp(sem);

        std::thread cons(consumer);
        std::thread cons1(consumer);
        std::thread prod(producer);
        cons.join();
        cons1.join();
        prod.join();

        clearsemp(sem);
}

The running efficiency of the above code is not faster than busy waiting, but it also does not occupy high CPU. Semaphores can work well in certain scenarios, such as the multi-producer-multi-consumer model.

The above code does not solve the problem of large-granularity locks. Whether it is cons, cons2 or prod threads, they will block each other in acquiring locks.

However, cons threads have the opportunity to enter lock competition early at sem_wait. Using this, you can reduce the number of cons threads competing for lock at the same time.

Optimize code 1:

Note: In the above code, the producer is not bound when the post signal is issued. If the production volume is not controlled, system resources will be exhausted.

One method is to determine the size of the FIFO. If the FIFO is full, stop this production, then sleep for a certain period of time and wait for the consumer to remove data from the queue, and then determine whether the queue is empty or has dropped to a certain threshold. If it is met Then continue filling the queue. There is a problem with this method, that is, if the consumer suddenly takes away all the data in a short period of time, the producer’s sleep will affect the efficiency. If the sleep cycle is reduced, it will cause the CPU to increase.

Another method is to use two semaphores. At this time, one semaphore is no longer used to manage the count of the entire queue. Instead, the upper limit of the queue is determined in advance, and then two semaphores are used to represent the empty space in the queue. The sum of the number of positions and the number of filled/full positions is the upper limit of the queue.

The logic of the producer is:

1) Wait whether there is an empty position available (sem_wait(empty_sem)).

2) Once the blocking is released, it means that there are positions marked as empty available, that is, there are unfilled units, then the lock queue (mutex_lock/sem_wait (binary sem), you can choose mutex here, or you can use binary sem, mutex Also supports both thread and process levels.

3) Fill the queue.

4) Unlock the queue.

5) Post adds a filled/full available position (sem_post(filled_sem)).

The consumer logic is:

1) Wait whether there is a filled position available (sem_wait(filled_sem)).

2) Once the blocking is released, it means that there is a filled position marked as available, that is, there is a filled unit, then the lock queue (mutex_lock/sem_wait (binary sem), you can choose mutex here, or you can use binary sem, mutex Also supports both thread and process levels.

3) Get data from the queue.

4) unlock queue

5) post adds an empty available position (sem_post(empty_sem)).

mutex = 1
Full = 0 // Initially, all slots are empty. Thus full slots are 0
Empty = n // All slots are empty initially



//Solution for Producer –

do{

//produce an item

wait(empty);
wait(mutex);

//place in buffer

signal(mutex);
signal(full);

}while(true)



//Solution for Consumer –

do{

wait(full);
wait(mutex);

// consume item from buffer

signal(mutex);
signal(empty);


}while(true)

Improve the basic model of CPU -> Large-granularity lock + sleep wake-up (low efficiency, low CPU)

In order to improve problems such as CPU busyness, you can use the sleep wake-up mechanism. By handing over the wake-up work to the kernel, it can also improve the response efficiency of the waiting thread without performing a busy check.

Component:

conditional variable

Code 1:

#include <thread>
#include <mutex>
#include <list>
#include <unistd.h>
#include <stdio.h>
#include <condition_variable>

std::list<long> FIFO;
std::mutex lock;
std::condition_variable cv;
long consumer_v = -1;
long producer_v = 99999;

void consumer(){
        static long times=0;
        while(consumer_v!=0){
                std::unique_lock<std::mutex> lk(lock);
                cv.wait(lk,[]{return !FIFO.empty();});
                consumer_v = std::move(FIFO.front());
                FIFO.pop_front();
                lk.unlock();
                times + + ;
                printf("consumer times : %ld\\
" , times);
        }
}

void producer(){
        static long times=0;
        while(producer_v--!=0){
                std::unique_lock<std::mutex> lk(lock);
                FIFO.push_back(producer_v);
                cv.notify_one();
                lk.unlock();
                times + + ;
                printf("producer times : %ld\\
" , times);
        }
}

int main()
{
        std::thread cons(consumer);
        std::thread prod(producer);
        cons.join();
        prod.join();
}

The running time of 99999 times is:

producer times: 99999
consumer times: 99931

consumer times: 99999

real 0m8.045s
user 0m0.899s
sys 0m2.625s

When the above code is running, the CPU usage is very low, and the value of consumer times is exactly the same as the value of producer times, which means that there will be no invalid polling.

Code 2:

#include <thread>
#include <mutex>
#include <list>
#include <unistd.h>
#include <stdio.h>
#include <condition_variable>
#include <sys/types.h>

std::list<long> FIFO;
std::mutex lock;
std::condition_variable cv;
long consumer_v = -1;
long producer_v = 99999;

void consumer(){
        static long times=0;
        while(consumer_v!=0){
                std::unique_lock<std::mutex> lk(lock);
                cv.wait(lk,[]{return !FIFO.empty();});
                consumer_v = std::move(FIFO.front());
                FIFO.pop_front();
                lk.unlock();
                times + + ;
                printf("[%d]consumer times : %ld\\
" ,gettid(), times);
        }
}

void producer(){
        static long times=0;
        while(producer_v--!=0){
                std::unique_lock<std::mutex> lk(lock);
                FIFO.push_back(producer_v);
                cv.notify_all();
                lk.unlock();
                times + + ;
                printf("producer times : %ld\\
" , times);
        }
}

int main()
{
        std::thread cons(consumer);
        std::thread cons1(consumer);
        std::thread prod(producer);
        cons.join();
        cons1.join();
        prod.join();
}

The above code starts 2 consumer threads. If the consumer runs slowly, this mode can effectively improve efficiency.

Advantages and disadvantages:

Advantages: The code is concise and mobile, easy to read and modify, and the logic is clear. Use sleep instead of busy waiting to avoid excessive CPU usage. There will be no invalid consumption polling times.

Disadvantages: The price of avoiding high CPU load and busy waiting is slow running speed. Comparing busy waiting, it can be found that the time consumed is 50 times that of busy waiting.

For more introduction to condition variables, see:[modern c++] std::condition_variable Use of condition variables_What is the meaning of stop_waiting in condition_variable – CSDN blog article has been viewed 357 times. Code snippet: std::mutex mut;std::queue data_queue;std::condition_variable data_cond;void data_preparation_thread(){while (more_data_to_prepare()){data_chunk const data = prepare_data();std::lock_guard lk(mut); What does stop_waiting in _condition_variable mean? https://blog.csdn.net/ykun089/article/details/114735322

Others:

When we lock the list, the timing of releasing the lock must be well controlled. It is recommended to use std::move to take the data that needs to be processed out of the FIFO, or copy it out by copying, and then release the lock immediately. This will not affect the locking of other threads. You cannot perform time-consuming operations in a locked state unless you have a good reason or know what you are doing.