Teach you step by step how to implement the producer-consumer model

?Yin's Blog | Yin's Blog (yinkai.cc), welcome to my blog to read.

This article will introduce you to the definition, characteristics, and processes of the producer-consumer model, and take you step by step to implement the producer-consumer model.

1. Introduction

The producer-consumer pattern is a concurrency design pattern used to solve the collaboration and data sharing issues between producers and consumers in a multi-threaded environment. In this model, there are two different roles: producer and consumer, which jointly operate a shared buffer to achieve safe communication between threads.

Its application scenarios and advantages are as follows:

  1. Asynchronous task processing: In asynchronous programming, producers can generate asynchronous tasks, and consumers are responsible for processing these tasks. The producer-consumer pattern can effectively coordinate the generation and processing of asynchronous tasks and improve the response speed of the system.

  2. Buffer processing: When the processing speed of the producer and the consumer is inconsistent, the speed difference between the two can be balanced by introducing a buffer. The producer puts data into the buffer, and the consumer obtains data from the buffer, allowing the two to run independently and improve the efficiency of the system.

  3. Task Scheduling: In a task scheduling system, producers can generate tasks that need to be executed, and consumers are responsible for executing these tasks. Through the producer-consumer model, tasks can be flexibly managed and scheduled, and task distribution and execution can be decoupled.

  4. Message Queue: The producer-consumer pattern is often used in the implementation of message queues. Producers send messages to the queue, and consumers get messages from the queue for processing. This model enables message generation and processing to be performed asynchronously, improving the scalability and maintainability of the system.

  5. Decoupling producers and consumers: Decoupling between producers and consumers makes the system more flexible and maintainable. The producer and consumer implementations can be modified and extended independently without affecting the stability of the overall system.

2. Role

There are three roles in the producer-consumer model, namely producer, consumer and buffer. The following is an introduction to the responsibilities and characteristics of these two roles:

Producer

The main responsibility of the producer is to generate data and put the data into the shared buffer and wait when the buffer is full.

It is characterized by:

  1. Run independently: Data can be generated at its own pace without waiting for processing by consumers.

  2. Data generation: Mainly focuses on the generation and placement of data, does not involve specific data processing logic, and does not care about the final use of the data.

  3. May block: When the buffer is full, the producer may be blocked to ensure synchronization between the producer and the consumer.

  4. Task allocation: Producers can allocate and schedule the generated data according to needs, and distribute the data to different consumers for processing.

Consumer

The main responsibility of the consumer is to obtain data from the shared buffer, process it, and wait when the buffer is empty.

It is characterized by:

  1. Running independently: It can get data from the buffer at its own speed without having to wait for the producer to produce it.

  2. Data processing: Mainly focuses on the processing of the acquired data, but does not involve the data generation process.

  3. Possibly blocking: The consumer may be blocked when the buffer is empty, which ensures synchronization between the producer and consumer.

  4. Task Execution: Consumers may be responsible for the actual logic of executing tasks, such as processing messages, performing calculations, etc., depending on the specific application scenario.

Buffer

The buffer plays a key role in the producer-consumer model. Its main role is to act as an intermediary between producers and consumers, and is used to store data generated by producers so that consumers can obtain it safely and orderly. data.

The buffer provides a synchronization point that allows producers and consumers to coordinate their operations. The buffer provides a certain amount of data. The producer generates data and puts it into the buffer. The consumer obtains the data from the buffer. The two communicate indirectly through the buffer without directly relying on the other party’s state. This can make the two can operate asynchronously without causing data loss or inconsistency.

Its importance is as follows:

  1. Prevent race conditions: Avoid race conditions between producers and consumers, ensuring the correctness of data access in a multi-threaded environment.

  2. Improving system throughput: The use of buffers can improve the throughput of the system, allowing producers and consumers to operate at their own speeds without blocking each other.

  3. Reduce resource competition: As a shared data structure, the buffer reduces competition for shared resources and improves system efficiency through appropriate synchronization mechanisms.

  4. Increase system flexibility: The introduction of buffers makes the system more flexible. The size of the buffer can be adjusted to meet the needs of different scenarios. It also provides an intermediary layer so that different parts of the system can evolve independently. Does not affect the overall structure.

3. Basic process

Steps for producers to put data into buffers

The process of the producer putting data into the buffer includes acquiring the mutex lock, checking the buffer status, putting the generated data into the buffer, and then releasing the mutex lock.

Steps for consumers to obtain data from the buffer

The process of the consumer obtaining data from the buffer includes obtaining the mutex lock, checking the buffer status, obtaining the data for processing, and then releasing the mutex lock.

Someone may ask, “Why use buffers instead of direct producers and consumers to communicate directly?”

The main reasons for using buffers are decoupling and synchronization. The buffer acts as an intermediary and provides an independent data storage space so that producers and consumers can run independently without immediate communication. This decoupling increases the flexibility of the system, allowing producers and consumers to operate on data at their own speed without having to wait for each other. At the same time, the buffer ensures thread safety through the synchronization mechanism and prevents data competition and inconsistency issues, thereby improving the stability and maintainability of the system.

4. Take everyone step by step to achieve it

First of all, we must make it clear that there are four types of producer-consumer models, namely one-to-one, one-to-many, many-to-one, and many-to-many. There are subtle differences between different models. Let’s explain them one by one:

Before we officially start writing code, we first write an output package to facilitate subsequent printing of the required information. We create a out directory in the project root directory, and then create a out.go file. The code is as follows:

package out
?
import "fmt"
?
// Out output
type Out struct {
    data chan interface{}
}
?
// singleton mode
var out *Out
?
// NewOut initialization
func NewOut() *Out {
    if out == nil {
        out = &Out{
            data: make(chan interface{}, 65535), // The buffer must be set here
        }
    }
    return out
}
?
//Writing method of Println out
func Println(i interface{}) {
    out.data <- i
}
?
// OutPut will output all data in out
func (o *Out) OutPut() {
    for {
        select {
        case i := <-o.data:
            fmt.Println(i)
        }
    }
}
One-on-one

First, we define a task structure. The task generated by the producer is a task with ID, and then the logic of consuming the task is to print out the ID of the task:

// Task task
type Task struct {
    ID int64
}
?
//Consumption task
func (t *Task) run() {
    out.Println(t.ID)
}

Then define a buffer to store the tasks produced by the producer. Here, a cached channel is used as the buffer, and the number of tasks that the producer needs to produce is assigned a value:

// Buffer pool
var taskCh = make(chan Task, 10)
?
//The number of tasks that the producer needs to produce
const taskNum int64 = 10000

Then write our producer logic. Because it is one-to-one, there is only one producer. Then after the producer completes the task, the producer channel can be closed. It should be noted that if it is not closed here, it may cause subsequent consumers to mistakenly think that there are still tasks in production and wait, resulting in a deadlock:

func producer(wo chan<- Task) {
    var i int64
    for i = 1; i <= taskNum; i + + {
        t := Task{
            ID: i,
        }
        wo <- t
    }
    //A single producer can close the channel directly. After closing, the consumer can still consume
    close(wo)
}

Let’s look at the consumer logic again. For consumers, just use for - range to block and wait for the producer’s production tasks. After the producer’s production is completed, it will actively close the connection and the consumer’s consumption will be completed. After that, the for - range loop will end:

func consumer(ro <-chan Task) {
    for t := range ro {
        if t.ID != 0 {
            t.run()
        }
    }
}

The last is our execution function. Since we don’t know when the producer and consumer have completed the tasks they are responsible for, we use sync.WaitGroup to make coroutine notifications to ensure that the producer task is completed. And the consumer task is completed:

func Exec() {
    wg := & amp;sync.WaitGroup{}
    wg.Add(2)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()
        producer(taskCh)
    }(wg)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()
        consumer(taskCh)
    }(wg)
?
    wg.Wait()
    out.Println("Execution successful")
?
}

When using sync.WaitGroup to pass function parameters, you need to pay attention. Since the function parameters in Go language are passed by value, if only the value is passed, execute Done()< inside the function. The /code> operation will not affect the number of counters outside the function, so if you want to pass parameters, you need to use pointers to pass them.

Complete code
package one_one
?
import (
    "main/out"
    "sync"
)
?
// Task task
type Task struct {
    ID int64
}
?
//Consumption task
func (t *Task) run() {
    out.Println(t.ID)
}
?
/* potential problem
1. The producer channel is not closed, the consumer is not finished, and wg.Wait() does not wait, resulting in a deadlock.
2. When wg passes parameters, if it is passed by value, wg.Wait() may not be Done and set to zero.
*/
// Buffer pool
var taskCh = make(chan Task, 10)
?
//The number of tasks that the producer needs to produce
const taskNum int64 = 10000
?
// a producer
func producer(wo chan<- Task) {
    var i int64
    for i = 1; i <= taskNum; i + + {
        t := Task{
            ID: i,
        }
        wo <- t
    }
    //A single producer can close the channel directly. After closing, the consumer can still consume
    close(wo)
}
?
// a consumer
func consumer(ro <-chan Task) {
    for t := range ro {
        if t.ID != 0 {
            t.run()
        }
    }
}
?
func Exec() {
    wg := & amp;sync.WaitGroup{}
    wg.Add(2)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()
        producer(taskCh)
    }(wg)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()
        consumer(taskCh)
    }(wg)
?
    wg.Wait()
    out.Println("Execution successful")
?
}
One-to-many

In the case of one-to-many, there is still one producer to produce tasks, but multiple consumers to consume tasks. Here we only need to modify the Exec() function on the basis of "one-to-one" to open multiple Just one consumer can consume, and the consumption logic does not need to be modified. It needs to be emphasized here that since chennel in the Go language is thread-safe, when multiple consumers compete for tasks here, there will be no thread-safety issues, and we do not need to add additional locks. Make a fool of yourself.

Here we will gradually open new consumers for consumption through the increment of tasks:

func Exec() {
    wg := & amp;sync.WaitGroup{}
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()
        producer(taskCh)
    }(wg)
    var i int64
    for i = 0; i < taskNum; i + + {
        if i 0 == 0 { // Gradually open new consumers for consumption according to task increment
            wg.Add(1)
            go func(wg *sync.WaitGroup) {
                defer wg.Done()
                consumer(taskCh)
            }(wg)
        }
    }
    wg.Wait()
    out.Println("Execution successful")
?
}

For every 100 tasks, a consumer is opened for consumption. Instead of allocating 100 tasks to a designated consumer, multiple consumers compete for tasks in the buffer to handle execution.

Complete code
package one_many
?
import (
    "main/out"
    "sync"
)
?
// Task task
type Task struct {
    ID int64
}
?
//Consumption task
func (t *Task) run() {
    out.Println(t.ID)
}
?
/* potential problem
1. Channel is thread-safe, and there is no data competition problem when multiple consumers consume at the same time.
2. When wg passes parameters, if it is passed by value, wg.Wait() may not be Done and set to zero.
*/
// Buffer pool
var taskCh = make(chan Task, 10)
?
//The number of tasks that the producer needs to produce
const taskNum int64 = 10000
?
// a producer
func producer(wo chan<- Task) {
    var i int64
    for i = 1; i <= taskNum; i + + {
        t := Task{
            ID: i,
        }
        wo <- t
    }
    //A single producer can close the channel directly. After closing, the consumer can still consume
    close(wo)
}
?
// a consumer
func consumer(ro <-chan Task) {
    for t := range ro {
        if t.ID != 0 {
            t.run()
        }
    }
}
?
func Exec() {
    wg := & amp;sync.WaitGroup{}
    wg.Add(1)
    go func(wg *sync.WaitGroup) {
        defer wg.Done()
        producer(taskCh)
    }(wg)
    var i int64
    for i = 0; i < taskNum; i + + {
        if i 0 == 0 { // Gradually open new consumers for consumption according to task increment
            wg.Add(1)
            go func(wg *sync.WaitGroup) {
                defer wg.Done()
                consumer(taskCh)
            }(wg)
        }
    }
    wg.Wait()
    out.Println("Execution successful")
?
}
Many to one

In the case of many-to-one, multiple producers are needed to produce tasks, and only one consumer is required to consume tasks, so the consumer logic here does not need to be changed. As for the producer logic, here we specify the number of tasks that each producer needs to produce nums, and then the producer logic is: start production from the current task number, produce nums, Then you can stop production, as follows:

//Multiple producers
func producer(wo chan<- Task, startNum int64, nums int64) {
    var i int64
    for i = startNum; i < startNum + nums; i + + {
        t := Task{
            ID: i,
        }
        wo <- t
    }
}

Similarly, in the execution logic of Exec(), we create a many-to-one producer-consumer logic. We limit each producer to produce nums tasks, that is, a new producer will be opened to produce every nums tasks. At the same time, in order to ensure that the producer task is completed, we use pwg.Add(1) to ++ the counter before the production task, and after the production task ends, use pwg.Done() code> will counter --.

Still the same, in order to ensure that both the producer and the consumer have completed the task, we use wg.Add(1) to set the counter + + before the producer and consumer work, and before the producer and consumer After the operator completes the work, the counter will be --.

func Exec() {
    // Ensure that the producer task is completed
    wg := & amp;sync.WaitGroup{}
    // Ensure that the channel is closed after the producer task is completed.
    pwg := & amp;sync.WaitGroup{}
    var i int64
    wg.Add(1)
    for i = 0; i < taskNum; i + = nums {
        if i >= taskNum {
            break
        }
        //Each producer produces 100 tasks
        wg.Add(1)
        pwg.Add(1)
        // Question 2: Parameter passing
        go func(i int64) {
            defer wg.Done()
            defer pwg.Done()
            producer(taskCh, i, nums)
        }(i)
    }
?
    go func() {
        defer wg.Done()
        consumer(taskCh)
    }()
    pwg.Wait()
    // Need to pay attention here, question 1
    go close(taskCh)
    wg.Wait()
?
    out.Println("Execution successful")
}

There are two issues that need to be noted here: one is that the variable i in the for loop may have memory sharing problems, because the variable in this loop may >The value of i is 199, but after the coroutine starts executing, the value of i passed into the producer() function It becomes 200, so here you need to use parameters to pass the value of i to the corresponding coroutine. Another problem is that when closing the channel close(taskch), there may be a very small time difference, and there may be coroutines writing data to the channel, so here use go(close) will be safer.

Complete code
package many_one
?
import (
    "main/out"
    "sync"
)
?
// Task task
type Task struct {
    ID int64
}
?
//Consumption task
func (t *Task) run() {
    out.Println(t.ID)
}
?
/* potential problem
1. Go close to close the channel, because there may be coroutines writing data to it, and there is a very small time difference.
2. There may be data competition issues when producers are producing.
*/
// Buffer pool
var taskCh = make(chan Task, 10)
?
//The number of tasks that the producer needs to produce
const taskNum int64 = 10000
?
//The number of tasks produced by each producer, 100
const nums int64 = 100
?
//Multiple producers
func producer(wo chan<- Task, startNum int64, nums int64) {
    var i int64
    for i = startNum; i < startNum + nums; i + + {
        t := Task{
            ID: i,
        }
        wo <- t
    }
}
?
// a consumer
func consumer(ro <-chan Task) {
    for t := range ro {
        if t.ID != 0 {
            t.run()
        }
    }
}
?
func Exec() {
    // Ensure that the producer task is completed
    wg := & amp;sync.WaitGroup{}
    // Ensure that the channel is closed after the producer task is completed.
    pwg := & amp;sync.WaitGroup{}
    var i int64
    wg.Add(1)
    for i = 0; i < taskNum; i + = nums {
        if i >= taskNum {
            break
        }
        //Each producer produces 100 tasks
        wg.Add(1)
        pwg.Add(1)
        // Question 2: Parameter passing
        go func(i int64) {
            defer wg.Done()
            defer pwg.Done()
            producer(taskCh, i, nums)
        }(i)
    }
?
    go func() {
        defer wg.Done()
        consumer(taskCh)
    }()
    pwg.Wait()
    // Need to pay attention here, question 1
    go close(taskCh)
    wg.Wait()
?
    out.Println("Execution successful")
}
Many-to-many

If it is many-to-many, it will be closer to the real-life scenario. There will be a steady stream of producers producing tasks, and there will be consumers constantly consuming tasks. None of them will actively exit and rely on artificial signals to exit goroutine. Therefore, we need to first define a global stop signal:

//Signal to stop running
var done = make(chan struct{})

Since the producer produces infinitely, there is no doubt that the producer logic is written in a for loop. Here, in order to avoid the buffer being full, the producer cannot receive due to blocking. done signal, we use select to implement:

func producer(wo chan<- Task, done chan struct{}) {
    var i int64
    for {
        if i >= TaskNum { // Unlimited production
            i = 0
        }
        i++
        t := Task{
            ID: i,
        }
        // This can prevent the shutdown signal from being closed due to producer blocking.
        select {
        case wo <- t:
        case <-done:
            out.Println("Producer exits")
            return
        }
    }
}

Similarly, our consumer logic must be written in the for loop, and also cooperate with select to receive signals:

func consumer(ro <-chan Task, done chan struct{}) {
    for {
        select {
        case t := <-ro:
            if t.ID != 0 {
                t.run()
            }
        case <-done: // If you exit directly here, there may be values in the channel that have not been consumed (if there is a buffer area)
            for t := range ro { // The producer has stopped and messages will no longer be produced. After the consumer has consumed all the messages, he can exit.
                if t.ID != 0 {
                    t.run()
                }
            }
            out.Println("Consumer exits")
            return
        }
    }
}

After receiving the done signal, there is a small pit here: there may be tasks in the buffer that have not been consumed at this time. Therefore, we should consume and execute the consumption logic again before exiting the operation to ensure that there are no tasks remaining in the buffer.

The logic of executing the function is very simple. Just open multiple producers and consumers asynchronously and run them at the same time. Here you also need to pay attention to closing the signal first and then closing the channel. If the reverse is done, data may be written to the closed channel and an exception will be reported.

package many_many

import (
"main/out"
"time"
)

// Task task
type Task struct {
ID int64
}

//Consumption task
func (t *Task) run() {
out.Println(t.ID)
}

/* potential problem
1. Neither the producer nor the consumer actively exits, but relies on signals to exit the goroutine.
2. Production is continuous and consumers are uninterrupted.
*/

// buffer pool
var taskCh = make(chan Task, 10)

// Signal to stop running
var done = make(chan struct{})

// TaskNum is the number of tasks that the producer needs to produce
const TaskNum int64 = 10000

func producer(wo chan<- Task, done chan struct{}) {
var i int64
for {
if i >= TaskNum { // Unlimited production
i = 0
}
i++
t := Task{
ID: i,
}
// This can prevent the shutdown signal from being closed due to producer blocking.
select {
case wo <- t:
case <-done:
out.Println("Producer exits")
return
}
}
}

func consumer(ro <-chan Task, done chan struct{}) {
for {
select {
case t := <-ro:
if t.ID != 0 {
t.run()
}
case <-done: // If you exit directly here, there may be values in the channel that have not been consumed (if there is a buffer area)
for t := range ro { // The producer has stopped and messages will no longer be produced. After the consumer has consumed all the messages, he can exit.
if t.ID != 0 {
t.run()
}
}
out.Println("Consumer exits")
return
}
}
}

func Exec() {
//Multiple producers
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)

//Multiple consumers
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)

time.Sleep(time.Second * 5)
//Be sure to close done first, then close the channel. Prevent data from being written to a closed channel and report an exception.
close(done)
close(taskCh)
out.Println("Execution successful")
}

5. Summary

The core idea of the producer-consumer model is to achieve decoupling between producers and consumers through shared buffers, so that producers generate data and put it into the buffer, and consumers obtain it from the buffer. The data is processed.

Key implementation points include synchronization mechanisms, blocking and wake-up mechanisms, and decoupling direct dependencies between producers and consumers. This mode improves the flexibility and efficiency of the system by balancing the speed of data generation and processing, and is suitable for asynchronous data exchange in a multi-threaded environment.

syntaxbug.com © 2021 All Rights Reserved.