Article directory
- 1. Introduction to the use of waitgroup
-
- 1. Easy to use
- 2. Incorrect usage warning
-
- 1. The Add method is used in the wrong location
- 2. The Done method is used in the wrong location
- 3. Sub-coroutine closure uses external variables
- 2. waitGroup + channel completes data aggregation
-
- 1. Version 1 (defective)
- 2. Version 2 (no flaws, but not elegantly written)
- 3. Version 3 (elegant implementation)
- Three: Scene expansion
Code address: https://gitee.com/lymgoforIT/golang-trick/tree/master/23-waitgroup + channel
1. Introduction to using waitgroup
1. Simple to use
The wait group tool sync.WaitGroup
is essentially a concurrency counter with three exposed core methods:
-
WaitGroup.Add(n)
: Complete a registration operation so that the value of the concurrency counter inWaitGroup
is added ton
. In the usage scenario,WaitGroup.Add(n) is to register and start n
childgoroutine
-
WaitGroup.Done()
: Completes a reporting operation, reducing the value of the concurrency counter inWaitGroup
by1
. In usage scenarios, it is usually Before a childgoroutine
exits, it will execute theWaitGroup.Done
method once -
WaitGroup.Wait()
: Complete the aggregation operation. This method is usually called by the maingoroutine
, and the maingoroutine
will fall into blocking until all childrencode>goroutine
has been executed, so that when the concurrency counter value ofWaitGroup
is cleared to zero, the maingoroutine
can continue to execute.
The following is an example of using sync.WaitGroup:
-
First declare a waiting group
wg
-
Start the
for
loop and prepare to start10
childgoroutine
-
Before starting each child
goroutine
, first call theWaitGroup.Add
method in the main goroutine to complete the registrationof the child
goroutine
(note , the calling timing of the WaitGroup.Add method should be in the main goroutine rather than the child goroutine) -
Start the child
goroutine
in sequence, and passdefer
in each childgoroutine
to ensure thatWaitGroup.Done is called once before exiting. code> method, complete the reporting action and reduce the counter value in
WaitGroup
by1
-
After the main
goroutine
starts the childgoroutine
, it calls theWaitGroup.Wait
method, blocking and waiting until all childgoroutine
After executing theWaitGroup.Done
method and clearing theWaitGroup
counter, the maingoroutine
can continue.
package main import ( "fmt" "sync" "time" ) func main() {<!-- --> var wg sync.WaitGroup start := time.Now() for i := 0; i < 10; i + + {<!-- --> wg.Add(1) // Error-prone point 1 go func(i int) {<!-- --> defer func() {<!-- --> wg.Done() // Error-prone point 2 if err := recover(); err != any(nil) {<!-- --> fmt.Println("panic occurred") } }() fmt.Println(i) <-time.After(time.Second) // It takes one second to simulate the business }(i) // Error-prone point 3 } wg.Wait() fmt.Println(time.Since(start)) }
Output results:
From the output results, it is not difficult to see that 0-9
are not output in order. It takes a certain amount of time to start the coroutine, and the coroutine that is started first is not executed first. In addition, although 10
We simulated the coroutine and it took one second, but after all 10
coroutines were executed, it finally took a little more than one second. This is the benefit of using coroutines, because if they are executed serially 10
tasks will take at least ten seconds.
2. Incorrect usage warning
1. The Add method is used in the wrong place
When using sync.WaitGroup
, you need to ensure that the WaitGroup.Add
operation that adds the counter value is executed before the WaitGroup.Wait
operation, otherwise it may occur Logic problems may even cause the program to panic
.
Here is a counter-example shown below. Pay attention to the location of error-prone code 1
:
package main import ( "fmt" "sync" "time" ) func main() {<!-- --> var wg sync.WaitGroup start := time.Now() for i := 0; i < 10; i + + {<!-- --> // wg.Add(1) // Error-prone point 1 go func(i int) {<!-- --> wg.Add(1) // Error-prone point 1 defer func() {<!-- --> wg.Done() // Error-prone point 2 if err := recover(); err != any(nil) {<!-- --> fmt.Println("panic occurred") } }() fmt.Println(i) <-time.After(time.Second) // It takes one second to simulate the business }(i) // Error-prone point 3 } wg.Wait() fmt.Println(time.Since(start)) }
In the above code, we execute the WaitGroup.Add
and WaitGroup.Done
methods in sequence inside the child goroutine
, and in the main goroutine
Execute the WaitGroup.Wait
method externally. At first glance, there is no problem with Add
and Done
being executed in pairs, but in fact it is not. There is a Problem: Since the child goroutine
is started asynchronously, it is possible that the Wait
method is executed before the Add
method. At this time, due to the counter If the value is 0
, the Wait
method will be released directly, resulting in an unexpected execution flow.
2. The Done method is used in the wrong location
When starting a sub-coroutine, there is a well-established convention that must use recover to capture panic in the sub-coroutine to avoid panic in the sub-coroutine that is not captured and keeps throwing up, causing the main program to panic and the program to exit
Here is a counter-example shown below. Pay attention to the location of error-prone code 2
:
package main import ( "fmt" "sync" "time" ) func main() {<!-- --> var wg sync.WaitGroup start := time.Now() for i := 0; i < 10; i + + {<!-- --> wg.Add(1) // Error-prone point 1 go func(i int) {<!-- --> defer func() {<!-- --> // wg.Done() // Error-prone point 2 if err := recover(); err != any(nil) {<!-- --> wg.Done() // Error-prone point 2 fmt.Println("panic occurred") } }() fmt.Println(i) <-time.After(time.Second) // It takes one second to simulate the business }(i) // Error-prone point 3 } wg.Wait() fmt.Println(time.Since(start)) }
Here we capture wg.Done()
into panic
, then if the sub-coroutine does not appear panic
, wg. Done
will not be executed, resulting in unexpected results.
Note: if
in the following code is true only when recover
captures panic
if err := recover(); err != any(nil) {<!-- --> wg.Done() // Error-prone point 2 fmt.Println("panic occurred") }
3. Use external variables for sub-coroutine closure
A subcoroutine can be regarded as a closure, and a closure can directly use external variables.
Here is a counter-example shown below. Pay attention to the location of error-prone code 3
:
package main import ( "fmt" "sync" "time" ) func main() {<!-- --> var wg sync.WaitGroup start := time.Now() for i := 0; i < 10; i + + {<!-- --> wg.Add(1) // Error-prone point 1 go func() {<!-- --> defer func() {<!-- --> wg.Done() // Error-prone point 2 if err := recover(); err != any(nil) {<!-- --> fmt.Println("panic occurred") } }() fmt.Println(i) <-time.After(time.Second) // It takes one second to simulate the business }() // Error-prone point 3 } wg.Wait() fmt.Println(time.Since(start)) }
Output results:
It can be seen that when i
is not passed as a variable to the sub-coroutine, the sub-coroutine uses an external variable, so the output is no longer 0-9
. But it is uncertain, because whatever the value of i
is when the coroutine is executed, it will output whatever it is.
2. waitGroup + channel completes data aggregation
We often encounter such a scenario at work. The main coroutine needs to recycle the results after all sub-coroutines have completed their work. Therefore, it is not enough for the main coroutine to know that all sub-coroutines have completed their work. It also needs to actually receive to the return data passed from the sub-coroutine. This part of the content involves data transfer between goroutine
. The common way here is to use a combination of WaitGroup
and channel
to complete the work.
1. Version 1 (defective)
First throw out a slightly flawed implementation version 1.0
:
? Create a channel
for data transmission: dataCh
. dataCh
can be an unbuffered type, because a reader that continues to receive data will be opened later. Coroutines, writing coroutines will not cause blocking.
? Create a slice
:resp
in the main goroutine
to carry the aggregated data results. It should be noted that slice
It is not a concurrency-safe data structure, so when writing data to slice
, you need to ensure that it is done serially.
? Start a read goroutine
asynchronously, continue to receive data from dataCh
, and then append it to resp slice
. Note that, Reading goroutine
traverses channel
in this for range
way, only when channel
is closed and the internal data is The traversal will not terminate until reading is completed
? Based on the usage pattern of WaitGroup
, start multiple sub-goroutines
, simulate the progress of the task, and stuff the processed data into dataCh
for reading goroutine
receiving and aggregating
? The main goroutine
uses the WaitGroup.Wait
operation to ensure that all child goroutines
have completed their work, and then executes the shutdown of dataCh
operate
? The main goroutine
obtains the resp slice
that aggregates the data from the reader goroutine
and continues processing
The corresponding implementation source code is as follows:
package main import ( "fmt" "sync" "time" ) func main() {<!-- --> tasksNum := 10 dataCh := make(chan interface{<!-- -->}) // Unbuffered channel resp := make([]interface{<!-- -->}, 0, tasksNum) // Start reading goroutine go func() {<!-- --> defer func() {<!-- --> if err := recover(); err != any(nil) {<!-- --> // ... } }() for data := range dataCh {<!-- --> resp = append(resp, data) } }() // Ensure that after all data is obtained, it is passed to the reading coroutine through the channel var wg sync.WaitGroup for i := 0; i < tasksNum; i + + {<!-- --> wg.Add(1) // Using a write-only channel as a parameter means that only the channel is written to in the coroutine, and the read operation is performed in the reading coroutine. go func(ch chan<- interface{<!-- -->}) {<!-- --> defer func() {<!-- --> wg.Done() if err := recover(); err != any(nil) {<!-- --> // ... } }() //The result of simulated coroutine execution is written to the channel. In work, it is usually the result of rpc call. ch <- time.Now().UnixNano() }(dataCh) } // Ensure that all coroutines that retrieve data have completed their work before closing the channel wg.Wait() close(dataCh) fmt.Printf("resp:% + v\\ len(resp):%v", resp, len(resp)) }
Run results:
We clearly enabled coroutine operation, but why did we only collect 9
results?
The problem is that the above code has a concurrency problem: the main goroutine
will close after ensuring that all child
and directly obtain the goroutine
have completed their tasks through the WaitGroup.Wait
method. >dataChresp slice
for printing. At this time, although dataCh
is closed, due to the uncertainty of asynchronous reading, goroutine
may not have time to aggregate all the data into resp slice
, so the main goroutine
takes it immediately after close(dateCh)
, the obtained resp slice
may have missing data.
2. Version 2 (no defects, but not elegantly written)
On the basis of version 1.0
, the problem is fixed and a solution of version 2.0
is proposed.
The problem that existed before was that the main goroutine may have used the resp slice before the reading goroutine completed data aggregation. Then we will enable an additional channel to identify whether the reading goroutine has completed execution: stopCh. Specific steps Includes:
-
After the main
goroutine
closesdataCh
, instead of immediately accessing theresp slice
, it will first try to read fromstopCh
Get the signal and continue after the reading is successful. -
Before exiting after reading
goroutine
, insert a semaphore intostopCh
so that the maingoroutine
can sense that readinggoroutine
Complete this event
After processing in this way, the logic is rigorous, and the main goroutine
can guarantee the complete data owned by the obtained resp slice
.
The code is as follows, compared to version 1, three stopCh codes have been added
package main import ( "fmt" "sync" "time" ) func main() {<!-- --> tasksNum := 10 dataCh := make(chan interface{<!-- -->}) // Unbuffered channel resp := make([]interface{<!-- -->}, 0, tasksNum) stopCh := make(chan struct{<!-- -->}, 1) // Start reading goroutine go func() {<!-- --> defer func() {<!-- --> if err := recover(); err != any(nil) {<!-- --> // ... } }() for data := range dataCh {<!-- --> resp = append(resp, data) } stopCh <- struct{<!-- -->}{<!-- -->} }() // Ensure that after all data is obtained, it is passed to the reading coroutine through the channel var wg sync.WaitGroup for i := 0; i < tasksNum; i + + {<!-- --> wg.Add(1) // Using a write-only channel as a parameter means that only the channel is written to in the coroutine, and the read operation is performed in the reading coroutine. go func(ch chan<- interface{<!-- -->}) {<!-- --> defer func() {<!-- --> wg.Done() if err := recover(); err != any(nil) {<!-- --> // ... } }() //The result of simulated coroutine execution is written to the channel. In work, it is usually the result of rpc call. ch <- time.Now().UnixNano() }(dataCh) } // Ensure that all coroutines that retrieve data have completed their work before closing the channel wg.Wait() close(dataCh) // Ensure that the reading coroutine processing is completed <-stopCh fmt.Printf("resp:% + v\\ len(resp):%v", resp, len(resp)) }
operation result:
3. Version 3 (elegant implementation)
Version 2.0
needs to introduce an additional stopCh
for communication interaction between the main goroutine
and the read goroutine
, see It always seems not elegant enough. Let's take it more seriously and discuss the solution of version 3.0
on how to omit this small channel
.
Here is a more elegant way to implement it.
-
Also create an unbuffered
dataCh
for the transmission of aggregated data -
Asynchronously start a write
goroutine
for the overview write process. In this writegoroutine
, based on theWaitGroup
usage pattern, let writegoroutine
The childgoroutine
further started in code> sends the data todataCh
after completing its work. -
Write
goroutine
based onWaitGroup.Wait
operation, and closedataCh
after ensuring all childgoroutine
have completed their work -
Next, let the main
goroutine
also play the role of readinggoroutine
, and continue to traverse and receivedataCh
throughfor range
The data in it is filled intoresp slice
-
When writing
goroutine
to closedataCh
, the maingoroutine
can end the traversal process, thereby ensuring that completeresp
data can be obtained
code show as below:
package main import ( "fmt" "sync" "time" ) //func test() {<!-- --> //var wg sync.WaitGroup // start := time.Now() // for i := 0; i < 10; i + + {<!-- --> // wg.Add(1) // Error-prone point 1 // go func() {<!-- --> // defer func() {<!-- --> // wg.Done() // Error-prone point 2 // if err := recover(); err != any(nil) {<!-- --> // fmt.Println("panic occurred") // } // }() // // fmt.Println(i) // <-time.After(time.Second) // It takes one second to simulate the business // }() // Error-prone point 3 // } // // wg.Wait() // fmt.Println(time.Since(start)) //} func main() {<!-- --> tasksNum := 10 dataCh := make(chan interface{<!-- -->}) // Unbuffered channel // Start writing goroutine, promote the concurrent data acquisition process, and aggregate the acquired data into the channel go func() {<!-- --> defer func() {<!-- --> if err := recover(); err != any(nil) {<!-- --> // ... } }() // Ensure that after all data is obtained, it is passed to the reading coroutine through the channel var wg sync.WaitGroup for i := 0; i < tasksNum; i + + {<!-- --> wg.Add(1) // Using a write-only channel as a parameter means that only the channel is written to in the coroutine, and the read operation is performed in the reading coroutine. go func(ch chan<- interface{<!-- -->}) {<!-- --> //defer func() { The recovery is performed in the parent coroutine, so the recovery here can be omitted // wg.Done() // if err := recover(); err != any(nil) {<!-- --> // // ... // } //}() wg.Done() //The result of simulated coroutine execution is written to the channel. In work, it is usually the result of rpc call. ch <- time.Now().UnixNano() }(dataCh) } // Ensure that all coroutines that retrieve data have completed their work before closing the channel wg.Wait() close(dataCh) }() resp := make([]interface{<!-- -->}, 0, tasksNum) // The main coroutine, as a reading coroutine, continues to read data until all writing coroutines complete their tasks and chan is closed before exiting the for loop and continuing execution. for data := range dataCh {<!-- --> resp = append(resp, data) } fmt.Printf("resp:% + v\\ len(resp):%v", resp, len(resp)) }
Run results:
Three: Scene expansion
In daily work, there are many scenarios where it is not only necessary to open multiple coroutines to use concurrency capabilities to improve performance, but also to collect the execution results of each sub-coroutine, such as whether the sub-coroutine executed successfully or failed, and what data was returned.
The functions of multiple coroutines enabled in the above demonstration are consistent, so they are written in a for
loop. In actual work, it may be necessary to open multiple different coroutines for rpc
call, and collect the results, and if a rpc
call fails, subsequent actions should not be continued.
For example: There is a scenario where the user's id
needs to be used to obtain the number of games played by the user in the last month, which friend he played with has the highest winning rate, what is the winning rate, and which friend is the best friend to harm. (The teammate with the lowest win rate in black), what is the win rate, and then send an email to the user.