[Linux 1++] Threads (3) including producer-consumer model

Author’s homepage: Attack 1++
Column link: [1++ Linux]

Article directory

  • 1. Reentrancy and thread safety
  • 2. Deadlock
  • 3. Thread synchronization
    • What is thread synchronization?
    • How to achieve thread synchronization
    • condition variable
  • 4. Producer and consumer model
    • 1. Basic components and concepts of the producer and consumer models

One, reentrancy and thread safety

Thread safety: Multiple execution streams will not produce different results when executing the same piece of code. This problem often occurs when global variables or static variables are operated without lock protection.
Reentrancy: The same function is called by different execution flows. Before the current process has finished executing, other execution flows will enter again. We call it reentrancy. If a function is reentrant and the running results will not be any different or have any problems, then the function is called a reentrant function. Otherwise, it is a non-reentrant function.

Common thread unsafe situations:

  1. Functions that do not protect shared variables
  2. Function status changes as the function is called
  3. Function that returns a pointer to a static variable
  4. Functions that call thread-unsafe functions

Common thread safety situations:

  1. Each thread only has read permission for global variables or static variables, but no write permission. Generally speaking, these threads are safe.
  2. Classes or interfaces are atomic operations for threads.
  3. Switching between multiple threads will not cause ambiguity in the execution results of the interface

Common inaccessible situations:

  1. The malloc/free function is called, because the malloc function uses a global linked list to manage the heap.
  2. Standard I/O library functions are called. Many implementations of the standard I/O library use global data structures in a non-reentrant manner.
  3. Reentrant function body uses static data structure

Common access situations:

  1. Do not use global or static variables
  2. Do not use the space opened with malloc or new
  3. Do not call non-reentrant functions
  4. No static or global data is returned, all data is provided by the caller of the function
  5. Use local data, or protect global data by making local copies of global data

The connection and difference between reentrancy and thread safety:

Contact:
A function is reentrant, that is, it is thread-safe.
The function is not reentrant, so it cannot be used by multiple threads, which may cause thread safety issues.
If there are global variables in a function, then the function is neither thread-safe nor reentrant.

Difference:
Reentrant functions are a type of thread-safe function
Thread safety does not necessarily mean reentrancy, but reentrant functions must be thread safe.
If the access to critical resources is locked, this function is thread-safe, but if the reentrant function has not released the lock, a deadlock will occur, so it is not reentrant.

Thread safety describes a state or possibility of mutual influence between threads. Reentrancy describes whether a function can be entered repeatedly.

This insert is locked, so it is safe when accessed by multiple threads. For example, when the insert in the main function reaches the second sentence, a signal comes, causing it to process the signal, but the execution flow of the main function is to apply for a lock. , it holds the lock, executes the signal capture method when the signal is delivered, and executes the handler. There is also an insert in the handler, so the insert will re-enter. However, when the insert function comes in, the signal capture execution flow must apply for the lock. It appears at this time that the main thread successfully applied for the lock and was accessing critical resources. Then the signal came and the signal processing function was executed. At this time, it applied for the lock again. In other words, the same process applied for the lock twice. The first time I successfully applied for the lock, and the second time I applied for the lock, but the lock was gone (actually you applied for it yourself), and you were suspended at this time. But the most embarrassing thing is that you are hanging while holding the lock. You are waiting for someone to release the lock and wake you up, but you are holding the lock. If no one releases it, no one will wake you up. So your process is suspended forever. This is called a thread that is safe, but not necessarily reentrant.

Two, deadlock

What is a deadlock:

Deadlock refers to a permanent waiting state in which each execution flow in a group of execution flows occupies resources that will not be released, but is in a permanent waiting state due to mutual application for resources that are used by other execution flows and will not be released.

Deadlock conditions:

Mutually exclusive condition: A resource can only be used by one execution flow at a time.
Request and hold conditions: When an execution flow is blocked due to a request for resources, and the obtained resources are held on.
Non-deprivation condition: The resources obtained by an execution flow cannot be forcibly deprived before they are used up.
Loop waiting condition: Several execution flows form a relationship of cyclic waiting for resources, starting from end to end.

So how to avoid deadlock?

  1. Four necessary conditions to break deadlock
  2. The locking sequence is consistent
  3. Avoid scenarios where locks are not released
  4. One-time allocation of resources

Algorithm to avoid deadlock:
Deadlock detection algorithm
banker’s algorithm

Three, thread synchronization

What is thread synchronization?

Under the premise of ensuring data security, allowing threads to access critical resources in a specific order, thereby effectively avoiding the starvation problem, is called synchronization.

How to achieve thread synchronization

Thread synchronization means how to enable threads to access critical resources in a specific order?
We do this using condition variables. Condition variables: When a thread accesses a variable mutually, it may find that it can’t do anything until other threads change state.

It’s like when we go to a mobile phone store to buy a mobile phone and find that the mobile phone we want is not in stock, so we don’t buy it, go back, and ask again the next day, but it’s still not available, the third day, and the fourth day. . . If you ask every day for a month if there is any goods, is this a waste of your time? **(This is correct, but it is unreasonable)** But if you add the shopping guide’s WeChat account, when the goods are available, He will notify you on WeChat and you can go buy it. Wouldn’t it be much more convenient this way?

Generally speaking, because of the lock, it is difficult for us to understand the situation of the resource (judging whether the resource is satisfied is also the process of accessing the resource). In this way, the scenario where one party notifies the other party that the resource is ready is a condition variable.

Race conditions: Program exceptions caused by timing issues are called race conditions.

Why is there thread synchronization?
Mainly to solve the problem of rationality of accessing critical resources. Do not cause hunger problems for others and waste your own resources.

Conditional variable

When we apply for critical resources, we must first judge whether the resource exists. Then the judgment of the resource is also a kind of access to the resource, so the judgment of the resource must also be between locking and unlocking. The conventional detection method is doomed to us to frequently apply for and release locks. Is there any way to prevent our thread from frequently performing self-checks when it detects that the resource is not ready, but to wait for notifications and wait for the conditions to be ready? To wake up? -This method is our condition variable.

Usage of condition variables:

The use of condition variables is similar to the use of mutexes. They can be initialized directly with macros or by calling the initialization function.


The pthread_cond_wait function is, when it is called because the conditions are not met, the execution flow will wait in a blocking manner, and the lock will be opened until a wake-up signal is received. Continue executing backwards.

The difference between the pthread_cond_timedwait function and the above function is that it has one more time parameter than pthread_cond_wait, which indicates how long it will take to unblock even if it is awakened.

The main difference between this function and pthread_cond_wait is the third parameter, abstime, from the description of the function
Look, this parameter is not unlocked after abstime as described in the red letter, but after it reaches abstime. So when we use parameters here, we cannot directly write a time interval, such as 5S, but The arrival time should be written, so the initialization process is:

struct timespec timeout;
//Define time point
timeout.tv_ sec= time(0) + 1; //time(0) represents the current time and //tv_ sec refers to seconds
timeout.tv_nsec=0;
//tv_nsec represents nanosecond time

pthread_cond_signal
The function of the function is to send a signal to another thread that is in a blocking waiting state to get it out of the blocking state and continue execution. If no thread is in a blocking waiting state, pthread_cond_signal will also return successfully.
When using pthread_cond_signal, there is generally no “thundering herd phenomenon”. It only sends a signal to one thread at most. If there are multiple threads blocking and waiting for this condition variable, then it is determined which thread receives the signal and continues execution based on the priority of each waiting thread. **If the priorities of each thread are the same, which thread gets the signal is determined based on the length of the waiting time. But in any case, a pthread_cond_signal call can send a message at most once. (The essence is to change the state of the received signal from S state to R state)
And pthread_cond_broadcast will signal all threads blocked in this condition variable.

Below we show a piece of relevant code to have a deeper understanding of the above conclusion:

//Define global locks and condition variables
pthread_mutex_t mtx=PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;
pthread_cond_t ct=PTHREAD_COND_INITIALIZER;

int i=0;
int j=0;
void* ctr(void* argv)
{<!-- -->
    while(true)
    {<!-- -->
        cout<<( + + j)<<"Wake up----"<<endl;
        pthread_cond_signal( & amp;ct);
        sleep(1);

    }
}

void* work(void* argv)
{<!-- -->
   while(true)
   {<!-- -->
     pthread_mutex_lock( & amp;mtx);
    pthread_cond_wait( & amp;ct, & amp;mtx);
    cout<< + + i<<"doing------"<<endl;
    pthread_mutex_unlock( & amp;mtx);
   }
    
}
int main()
{<!-- -->
    pthread_t boss;
    pthread_t staff[3];
    pthread_create( & amp;boss,nullptr,ctr,nullptr);
    for(int i=0;i<3;i + + )
    {<!-- -->
        pthread_create(staff + i,nullptr,work,nullptr);
    }

    pthread_join(boss,nullptr);
    for(int i=0;i<3;i + + )
    {<!-- -->
        pthread_join(staff[i],nullptr);
    }
    return 0;
}

4. Producer and consumer model

1. The basic components and concepts of the producer and consumer models

Basic components: Producers, consumers, trading places.

Producer-consumer model based on Blockingqueue: In multi-thread programming, blocking queue (Blocking Queue) is a data structure commonly used to implement producer and consumer models. The difference from an ordinary queue is that when the queue is empty, the operation of obtaining elements from the queue will be blocked until an element is placed in the queue; when the queue is full, the operation of storing elements in the queue will also be blocked. , until an element is taken out of the queue (the above operations are based on different threads, and the thread will be blocked when operating on the blocking queue process).

Let’s take shopping as an example:
We compare producers to factories that produce products, consumers to people who shop, and trading venues to shopping malls. Our data can be compared to commodities.

Why does a supermarket exist? Its essence is to serve as a buffer zone for goods (temporary storage of goods), thereby improving efficiency.

The core idea of the design is to reduce code coupling as much as possible. If code coupling is found, decoupling technology must be adopted. Let the three layers of data model, business logic and view display reduce the coupling with each other and minimize the correlation dependence without affecting the whole body. The existence of supermarkets is also a means of decoupling.

So what is the relationship between the three of them?

Between consumers and consumers: mutually exclusive, such as forcing concert tickets
Producer and Producer: Mutually exclusive, such as business war.
Producers and consumers: There is a mutually exclusive relationship. For example, if a consumer wants to get something from the shelf
goods, and producers also want to put Chinese goods on the shelves, at this time there is a question of who comes first and who comes last; synchronization relationship, customers can purchase after the factory produces the goods, and only when the goods are insufficient after the customers purchase, the factory will in production.

Below we use relevant code to simulate this process:

ypedef std::function<int(int,int)> func_t;

 class Operat
 {<!-- -->

public:
    inline static int Add(int x,int y)
    {<!-- -->
        return x + y;
    }

    inline static int Mult(int x,int y)
    {<!-- -->
        return x*y;
    }

    inline static int Sub(int x,int y)
    {<!-- -->
        return x-y;
    }

 };

pthread_t tid[3];
func_t func[3]={<!-- -->Operat::Add,Operat::Mult,Operat::Sub};

template<class T>
class Blockqueue
{<!-- -->
    bool IsEmpty()
    {<!-- -->
        return dp.size()==0;
    }

    bool IsFill()
    {<!-- -->
        return dp.size()==cap;
    }
public:
    Blockqueue()
    {<!-- -->
        pthread_mutex_init( & amp;mtx,nullptr);
        pthread_cond_init( & amp;EMPTY,nullptr);
        pthread_cond_init( & amp;Fill,nullptr);

    }

    ~Blockqueue()
    {<!-- -->
        pthread_mutex_destroy( & amp;mtx);
        pthread_cond_destroy( & amp;EMPTY);
        pthread_cond_destroy( & amp;Fill);

    }

    void Push(T & in)
    {<!-- -->
        //Create tasks
       int n= pthread_mutex_lock( & amp;mtx);
       assert(n==0);
       while(IsFill())
       {<!-- -->
        std::cout<<"Producer waits"<<std::endl;
        pthread_cond_wait( & amp;Fill, & amp;mtx);
       }

        std::cout<<"Making task"<<std::endl;
        dp.push(in);
        pthread_mutex_unlock( & amp;mtx);
        pthread_cond_signal( & amp;EMPTY);
       
    }

     void Pop(T* out)
    {<!-- -->
        //Consumption
       int n= pthread_mutex_lock( & amp;mtx);
       assert(n==0);
       // // When I wake up, where do I wake up from? ? Wherever we block and suspend, we wake up from there. When we wake up, we are still woken up in the critical section.
        // // When we are awakened, pthread_cond_wait will automatically help our thread acquire the lock.
        // // pthread_cond_wait: But as long as it is a function, the call may fail**
        // // pthread_cond_wait: There may be ** false wake-up situations **
       while(IsEmpty())
       {<!-- -->
        pthread_cond_wait( & amp;EMPTY, & amp;mtx);
        std::cout<<"Consumer waiting"<<std::endl;
       }
        std::cout<<"Get the task"<<std::endl;
        *out=dp.front();
        dp.pop();
        pthread_mutex_unlock( & amp;mtx);
        pthread_cond_signal( & amp;Fill);
       
    }


private:
    std::queue<T> dp;
    int cap=4;
    pthread_mutex_t mtx;
    pthread_cond_t EMPTY;
    pthread_cond_t Fill;

};
void* productor(void* argv)
{<!-- -->
    Blockqueue<func_t>* b=(Blockqueue<func_t>*)argv;//Must be received with a pointer, otherwise copy construction will generate a new object.
    //Causes a new lock to be generated
   while(true)
   {<!-- -->
     int n=rand()%3;
    b->Push(func[n]);
    sleep(1);
   }

   return nullptr;
}


void* consumer(void* argv)
{<!-- -->
    Blockqueue<func_t>* b=(Blockqueue<func_t>*)argv;
   while(true)
   {<!-- -->
     func_t ret;
    b->Pop( & amp;ret);
    int x=rand()%6;
    int y=rand()%7;
    std::cout<<"result"<<x<<"--"<<y<<"="<<ret(x,y)<<std::endl;
    sleep(1);
   }
   return nullptr;
}


int main()
{<!-- -->
    srand((unsigned)time(nullptr));
    Blockqueue<func_t>* p_blockq=new Blockqueue<func_t>;
    pthread_t c[2],p[2];
    pthread_create(c,nullptr,consumer,p_blockq);
    pthread_create(c + 1,nullptr,consumer,p_blockq);
    pthread_create(p,nullptr,productor,p_blockq);
    pthread_create(p + 1,nullptr,productor,p_blockq);


    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr);

       

    delete p_blockq;
    return 0;
}


Below we use a picture to vividly display what the code represents:


Please watch the VCR next: When our customers go to consume and find that there is no product to buy, the customer will notify the factory and go home to wait for the notification. The factory will start production after receiving the notification and will After a lot of goods are delivered to the supermarket, customers are notified to come and buy. At this time, the customers can shop. If the factory wants to continue to replenish the goods in the supermarket when the goods are still but not full, if there are customers to shop at this time, they will There needs to be competition (should I buy things first or you replenish the goods first)? If we replenish the goods when there are only out-of-stock items, and the factory produces faster, then during the period when customers are shopping, our factory can concentrate on my goods. Does this increase the efficiency of manufacturing? On the other hand, if I consume too quickly and the factory manager is replenishing the goods, can I use what I bought? Does this also increase the efficiency?

The supermarket is like a critical resource. Our producers want to access it, and consumers also want to access it. In order to avoid data errors due to timing issues, we only allow one execution flow to enter the supermarket, so consumers and consumers, Producers and producers, consumers and producers all have competitive relationships.

Let’s take a look at the relationship between mutual exclusion and synchronization: Our mutual exclusion is to protect the security of shared data, so it allows an execution flow to access critical resources. However, when judging critical resources, since no one Notifications, we can only frequently judge whether the access conditions have been reached again and again, and others cannot access, which is obviously a waste of resources. With synchronization, we can go home and wait for notifications, and other conditions are met. Threads can also enter. Doesn’t this increase efficiency? Therefore, synchronization supplements the shortcomings of mutual exclusion.
In the producer-consumer model, who puts the data in the queue and who gets the data is not the main conflict. The main conflict is how long it takes to process the data and how long it takes to get the data. , the producer-consumer model solves the strong coupling problem between producers and consumers through a container. Producers and consumers do not communicate directly with each other, but communicate through blocking queues. Therefore, after the producers produce the data, they do not need to wait for the consumers to process it, but directly throw it to the blocking queue. The consumers do not ask the producers for data, but Take it directly from the blocking queue, which is equivalent to a buffer, balancing the processing capabilities of producers and consumers. This blocking queue is used to decouple producers and consumers.

The most classic application scenario of blocking queue: pipeline

Advantages of the producer-consumer model:

decoupling
Support concurrency
Improve efficiency
Balance speed differences.