32. Use waitgroup + channel to complete coroutine result recycling and aggregation

Article directory

  • 1. Introduction to the use of waitgroup
    • 1. Easy to use
    • 2. Incorrect usage warning
      • 1. The Add method is used in the wrong location
      • 2. The Done method is used in the wrong location
      • 3. Sub-coroutine closure uses external variables
  • 2. waitGroup + channel completes data aggregation
    • 1. Version 1 (defective)
    • 2. Version 2 (no flaws, but not elegantly written)
    • 3. Version 3 (elegant implementation)
  • Three: Scene expansion

Code address: https://gitee.com/lymgoforIT/golang-trick/tree/master/23-waitgroup + channel

1. Introduction to using waitgroup

1. Simple to use

The wait group tool sync.WaitGroup is essentially a concurrency counter with three exposed core methods:

  • WaitGroup.Add(n): Complete a registration operation so that the value of the concurrency counter in WaitGroup is added to n. In the usage scenario, WaitGroup.Add(n) is to register and start n child goroutine

  • WaitGroup.Done(): Completes a reporting operation, reducing the value of the concurrency counter in WaitGroup by 1. In usage scenarios, it is usually Before a child goroutine exits, it will execute the WaitGroup.Done method once

  • WaitGroup.Wait(): Complete the aggregation operation. This method is usually called by the main goroutine, and the main goroutine will fall into blocking until all children code>goroutine has been executed, so that when the concurrency counter value of WaitGroup is cleared to zero, the main goroutine can continue to execute.

The following is an example of using sync.WaitGroup:

  • First declare a waiting group wg

  • Start the for loop and prepare to start 10 child goroutine

  • Before starting each child goroutine , first call the WaitGroup.Add method in the main goroutine to complete the registration of the child goroutine (note , the calling timing of the WaitGroup.Add method should be in the main goroutine rather than the child goroutine)

  • Start the child goroutine in sequence, and pass defer in each child goroutine to ensure that WaitGroup.Done method, complete the reporting action and reduce the counter value in WaitGroup by 1

  • After the main goroutine starts the child goroutine, it calls the WaitGroup.Wait method, blocking and waiting until all child goroutine After executing the WaitGroup.Done method and clearing the WaitGroup counter, the main goroutine can continue.

package main

import (
"fmt"
"sync"
"time"
)

func main() {<!-- -->

var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 10; i + + {<!-- -->
wg.Add(1) // Error-prone point 1
go func(i int) {<!-- -->
defer func() {<!-- -->
wg.Done() // Error-prone point 2
if err := recover(); err != any(nil) {<!-- -->
fmt.Println("panic occurred")
}
}()

fmt.Println(i)
<-time.After(time.Second) // It takes one second to simulate the business
}(i) // Error-prone point 3
}

wg.Wait()
fmt.Println(time.Since(start))
}


Output results:

From the output results, it is not difficult to see that 0-9 are not output in order. It takes a certain amount of time to start the coroutine, and the coroutine that is started first is not executed first. In addition, although 10 We simulated the coroutine and it took one second, but after all 10 coroutines were executed, it finally took a little more than one second. This is the benefit of using coroutines, because if they are executed serially 10 tasks will take at least ten seconds.

2. Incorrect usage warning

1. The Add method is used in the wrong place

When using sync.WaitGroup, you need to ensure that the WaitGroup.Add operation that adds the counter value is executed before the WaitGroup.Wait operation, otherwise it may occur Logic problems may even cause the program to panic.

Here is a counter-example shown below. Pay attention to the location of error-prone code 1:

package main

import (
"fmt"
"sync"
"time"
)

func main() {<!-- -->

var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 10; i + + {<!-- -->
// wg.Add(1) // Error-prone point 1
go func(i int) {<!-- -->
wg.Add(1) // Error-prone point 1
defer func() {<!-- -->
wg.Done() // Error-prone point 2
if err := recover(); err != any(nil) {<!-- -->
fmt.Println("panic occurred")
}
}()

fmt.Println(i)
<-time.After(time.Second) // It takes one second to simulate the business
}(i) // Error-prone point 3
}

wg.Wait()
fmt.Println(time.Since(start))
}

In the above code, we execute the WaitGroup.Add and WaitGroup.Done methods in sequence inside the child goroutine, and in the main goroutine Execute the WaitGroup.Wait method externally. At first glance, there is no problem with Add and Done being executed in pairs, but in fact it is not. There is a Problem: Since the child goroutine is started asynchronously, it is possible that the Wait method is executed before the Add method. At this time, due to the counter If the value is 0, the Wait method will be released directly, resulting in an unexpected execution flow.

2. The Done method is used in the wrong location

When starting a sub-coroutine, there is a well-established convention that must use recover to capture panic in the sub-coroutine to avoid panic in the sub-coroutine that is not captured and keeps throwing up, causing the main program to panic and the program to exit

Here is a counter-example shown below. Pay attention to the location of error-prone code 2:

package main

import (
"fmt"
"sync"
"time"
)

func main() {<!-- -->

var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 10; i + + {<!-- -->
wg.Add(1) // Error-prone point 1
go func(i int) {<!-- -->
defer func() {<!-- -->
// wg.Done() // Error-prone point 2
if err := recover(); err != any(nil) {<!-- -->
wg.Done() // Error-prone point 2
fmt.Println("panic occurred")
}
}()

fmt.Println(i)
<-time.After(time.Second) // It takes one second to simulate the business
}(i) // Error-prone point 3
}

wg.Wait()
fmt.Println(time.Since(start))
}

Here we capture wg.Done() into panic, then if the sub-coroutine does not appear panic, wg. Done will not be executed, resulting in unexpected results.

Note: if in the following code is true only when recover captures panic

if err := recover(); err != any(nil) {<!-- -->
wg.Done() // Error-prone point 2
fmt.Println("panic occurred")
}

3. Use external variables for sub-coroutine closure

A subcoroutine can be regarded as a closure, and a closure can directly use external variables.

Here is a counter-example shown below. Pay attention to the location of error-prone code 3:

package main

import (
"fmt"
"sync"
"time"
)

func main() {<!-- -->

var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 10; i + + {<!-- -->
wg.Add(1) // Error-prone point 1
go func() {<!-- -->
defer func() {<!-- -->
wg.Done() // Error-prone point 2
if err := recover(); err != any(nil) {<!-- -->
fmt.Println("panic occurred")
}
}()

fmt.Println(i)
<-time.After(time.Second) // It takes one second to simulate the business
}() // Error-prone point 3
}

wg.Wait()
fmt.Println(time.Since(start))
}

Output results:

It can be seen that when i is not passed as a variable to the sub-coroutine, the sub-coroutine uses an external variable, so the output is no longer 0-9. But it is uncertain, because whatever the value of i is when the coroutine is executed, it will output whatever it is.

2. waitGroup + channel completes data aggregation

We often encounter such a scenario at work. The main coroutine needs to recycle the results after all sub-coroutines have completed their work. Therefore, it is not enough for the main coroutine to know that all sub-coroutines have completed their work. It also needs to actually receive to the return data passed from the sub-coroutine. This part of the content involves data transfer between goroutine. The common way here is to use a combination of WaitGroup and channel to complete the work.

1. Version 1 (defective)

First throw out a slightly flawed implementation version 1.0:

? Create a channel for data transmission: dataCh. dataCh can be an unbuffered type, because a reader that continues to receive data will be opened later. Coroutines, writing coroutines will not cause blocking.

? Create a slice:resp in the main goroutine to carry the aggregated data results. It should be noted that slice It is not a concurrency-safe data structure, so when writing data to slice, you need to ensure that it is done serially.

? Start a read goroutine asynchronously, continue to receive data from dataCh, and then append it to resp slice. Note that, Reading goroutine traverses channel in this for range way, only when channel is closed and the internal data is The traversal will not terminate until reading is completed

? Based on the usage pattern of WaitGroup, start multiple sub-goroutines, simulate the progress of the task, and stuff the processed data into dataCh for reading goroutine receiving and aggregating

? The main goroutine uses the WaitGroup.Wait operation to ensure that all child goroutines have completed their work, and then executes the shutdown of dataCh operate

? The main goroutine obtains the resp slice that aggregates the data from the reader goroutine and continues processing

The corresponding implementation source code is as follows:

package main

import (
"fmt"
"sync"
"time"
)


func main() {<!-- -->

tasksNum := 10

dataCh := make(chan interface{<!-- -->}) // Unbuffered channel
resp := make([]interface{<!-- -->}, 0, tasksNum)

// Start reading goroutine
go func() {<!-- -->
defer func() {<!-- -->
if err := recover(); err != any(nil) {<!-- -->
// ...
}
}()
for data := range dataCh {<!-- -->
resp = append(resp, data)
}
}()

// Ensure that after all data is obtained, it is passed to the reading coroutine through the channel
var wg sync.WaitGroup
for i := 0; i < tasksNum; i + + {<!-- -->
wg.Add(1)
// Using a write-only channel as a parameter means that only the channel is written to in the coroutine, and the read operation is performed in the reading coroutine.
go func(ch chan<- interface{<!-- -->}) {<!-- -->
defer func() {<!-- -->
wg.Done()
if err := recover(); err != any(nil) {<!-- -->
// ...
}
}()

//The result of simulated coroutine execution is written to the channel. In work, it is usually the result of rpc call.
ch <- time.Now().UnixNano()
}(dataCh)
}

// Ensure that all coroutines that retrieve data have completed their work before closing the channel
wg.Wait()
close(dataCh)

fmt.Printf("resp:% + v\\
len(resp):%v", resp, len(resp))
}

Run results:

We clearly enabled coroutine operation, but why did we only collect 9 results?

The problem is that the above code has a concurrency problem: the main goroutine will close after ensuring that all child goroutine have completed their tasks through the WaitGroup.Wait method. >dataCh and directly obtain the resp slice for printing. At this time, although dataCh is closed, due to the uncertainty of asynchronous reading, goroutine may not have time to aggregate all the data into resp slice, so the main goroutine takes it immediately after close(dateCh) , the obtained resp slice may have missing data.

2. Version 2 (no defects, but not elegantly written)

On the basis of version 1.0, the problem is fixed and a solution of version 2.0 is proposed.

The problem that existed before was that the main goroutine may have used the resp slice before the reading goroutine completed data aggregation. Then we will enable an additional channel to identify whether the reading goroutine has completed execution: stopCh. Specific steps Includes:

  • After the main goroutine closes dataCh, instead of immediately accessing the resp slice, it will first try to read from stopCh Get the signal and continue after the reading is successful.

  • Before exiting after reading goroutine, insert a semaphore into stopCh so that the main goroutine can sense that reading goroutine Complete this event

After processing in this way, the logic is rigorous, and the main goroutine can guarantee the complete data owned by the obtained resp slice.

The code is as follows, compared to version 1, three stopCh codes have been added

package main

import (
"fmt"
"sync"
"time"
)

func main() {<!-- -->

tasksNum := 10

dataCh := make(chan interface{<!-- -->}) // Unbuffered channel
resp := make([]interface{<!-- -->}, 0, tasksNum)
stopCh := make(chan struct{<!-- -->}, 1)

// Start reading goroutine
go func() {<!-- -->
defer func() {<!-- -->
if err := recover(); err != any(nil) {<!-- -->
// ...
}
}()
for data := range dataCh {<!-- -->
resp = append(resp, data)
}
stopCh <- struct{<!-- -->}{<!-- -->}
}()

// Ensure that after all data is obtained, it is passed to the reading coroutine through the channel
var wg sync.WaitGroup
for i := 0; i < tasksNum; i + + {<!-- -->
wg.Add(1)
// Using a write-only channel as a parameter means that only the channel is written to in the coroutine, and the read operation is performed in the reading coroutine.
go func(ch chan<- interface{<!-- -->}) {<!-- -->
defer func() {<!-- -->
wg.Done()
if err := recover(); err != any(nil) {<!-- -->
// ...
}
}()

//The result of simulated coroutine execution is written to the channel. In work, it is usually the result of rpc call.
ch <- time.Now().UnixNano()
}(dataCh)
}

// Ensure that all coroutines that retrieve data have completed their work before closing the channel
wg.Wait()
close(dataCh)

// Ensure that the reading coroutine processing is completed
<-stopCh

fmt.Printf("resp:% + v\\
len(resp):%v", resp, len(resp))
}

operation result:

3. Version 3 (elegant implementation)

Version 2.0 needs to introduce an additional stopCh for communication interaction between the main goroutine and the read goroutine, see It always seems not elegant enough. Let's take it more seriously and discuss the solution of version 3.0 on how to omit this small channel.

Here is a more elegant way to implement it.

  • Also create an unbuffered dataCh for the transmission of aggregated data

  • Asynchronously start a write goroutine for the overview write process. In this write goroutine, based on the WaitGroup usage pattern, let write goroutine The child goroutine further started in code> sends the data to dataCh after completing its work.

  • Write goroutine based on WaitGroup.Wait operation, and close dataCh after ensuring all child goroutine have completed their work

  • Next, let the main goroutine also play the role of reading goroutine, and continue to traverse and receive dataCh through for range The data in it is filled into resp slice

  • When writing goroutine to close dataCh, the main goroutine can end the traversal process, thereby ensuring that complete resp data can be obtained

code show as below:

package main

import (
"fmt"
"sync"
"time"
)

//func test() {<!-- -->
//var wg sync.WaitGroup
// start := time.Now()
// for i := 0; i < 10; i + + {<!-- -->
// wg.Add(1) // Error-prone point 1
// go func() {<!-- -->
// defer func() {<!-- -->
// wg.Done() // Error-prone point 2
// if err := recover(); err != any(nil) {<!-- -->
// fmt.Println("panic occurred")
// }
// }()
//
// fmt.Println(i)
// <-time.After(time.Second) // It takes one second to simulate the business
// }() // Error-prone point 3
// }
//
// wg.Wait()
// fmt.Println(time.Since(start))
//}

func main() {<!-- -->

tasksNum := 10
dataCh := make(chan interface{<!-- -->}) // Unbuffered channel

// Start writing goroutine, promote the concurrent data acquisition process, and aggregate the acquired data into the channel
go func() {<!-- -->
defer func() {<!-- -->
if err := recover(); err != any(nil) {<!-- -->
// ...
}
}()

// Ensure that after all data is obtained, it is passed to the reading coroutine through the channel
var wg sync.WaitGroup
for i := 0; i < tasksNum; i + + {<!-- -->
wg.Add(1)
// Using a write-only channel as a parameter means that only the channel is written to in the coroutine, and the read operation is performed in the reading coroutine.
go func(ch chan<- interface{<!-- -->}) {<!-- -->
//defer func() { The recovery is performed in the parent coroutine, so the recovery here can be omitted
// wg.Done()
// if err := recover(); err != any(nil) {<!-- -->
// // ...
// }
//}()

wg.Done()
//The result of simulated coroutine execution is written to the channel. In work, it is usually the result of rpc call.
ch <- time.Now().UnixNano()
}(dataCh)
}

// Ensure that all coroutines that retrieve data have completed their work before closing the channel
wg.Wait()
close(dataCh)
}()

resp := make([]interface{<!-- -->}, 0, tasksNum)
// The main coroutine, as a reading coroutine, continues to read data until all writing coroutines complete their tasks and chan is closed before exiting the for loop and continuing execution.
for data := range dataCh {<!-- -->
resp = append(resp, data)
}

fmt.Printf("resp:% + v\\
len(resp):%v", resp, len(resp))
}

Run results:

Three: Scene expansion

In daily work, there are many scenarios where it is not only necessary to open multiple coroutines to use concurrency capabilities to improve performance, but also to collect the execution results of each sub-coroutine, such as whether the sub-coroutine executed successfully or failed, and what data was returned.

The functions of multiple coroutines enabled in the above demonstration are consistent, so they are written in a for loop. In actual work, it may be necessary to open multiple different coroutines for rpc call, and collect the results, and if a rpc call fails, subsequent actions should not be continued.

For example: There is a scenario where the user's id needs to be used to obtain the number of games played by the user in the last month, which friend he played with has the highest winning rate, what is the winning rate, and which friend is the best friend to harm. (The teammate with the lowest win rate in black), what is the win rate, and then send an email to the user.

syntaxbug.com © 2021 All Rights Reserved.