Go synchronization primitive sync/waitGroup —- waiting group

Basic usage

sync.WaitGroup is a synchronization primitive provided by the sync package in the Go language standard library, which is used to wait for a group of goroutine completes the task and is a quite commonly used data structure.

It is often used to wait for a batch of concurrent operations to complete before continuing to execute the main program to ensure that the concurrent operations are completed before proceeding with subsequent operations.

In the sync.WaitGroup (wait group) type, each sync.WaitGroup value internally maintains a counter (the initial default value of this count is 0), sync.WaitGroup uses a counter to track the number of waiting goroutine. Each waiting goroutine will be called before starting execution. code>Add method to increment the counter. When the goroutine has finished executing its task, the Done method should be called to decrement the counter. The main program can call the Wait method to block until the counter reaches zero, that is, all goroutine have been executed.

sync.WaitGroup provides the following functions:

func (wg *WaitGroup) Add(delta int) {<!-- -->...}
func (wg *WaitGroup) Done() {<!-- -->...}
func (wg *WaitGroup) Wait() {<!-- -->...}
  • Add(delta int): Increase the value of the counter. delta represents the counter value to be incremented, which can be a positive or negative number.
  • Done(): Reduce the value of the counter, which is equivalent to completing a task.
  • Wait(): Block the main program until the counter returns to zero. Generally called in the main program to wait for all tasks to be completed.

Here is a simple example showing how to use WaitGroup to wait for a set of concurrent tasks to complete:

package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, wg *sync.WaitGroup) {<!-- -->
defer wg.Done() // Indicates task completion

fmt.Printf("Worker %d started\\
", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d completed\\
", id)
}

func main() {<!-- -->
var wg sync.WaitGroup

for i := 1; i <= 5; i + + {<!-- -->
wg.Add(1) //increment counter for each task
go worker(i, & amp;wg)
}

wg.Wait() // Wait for all tasks to complete
fmt.Println("All workers have completed.")
}

In the above example, we create 5 goroutines, each goroutine executes the worker function. Each worker function will decrement the counter through the Done method to indicate that the task has been completed. wg.Wait() in the main program will block until all tasks are completed, and then output “All workers have completed.“. This ensures that all tasks are completed before continuing the execution of the main program.

The usage scenario of sync.WaitGroup in concurrent programming is mainly to wait for a set of concurrent operations to complete before continuing to execute the main program. It can effectively manage the synchronization and blocking of concurrent tasks to avoid premature termination of the main program or race conditions.

Structure

The structure code of WaitGroup is as follows:

//go 1.20.3 path: sync/waitgroup.go
type WaitGroup struct {<!-- -->
noCopy noCopy
state atomic.Uint64
  sema uint32
}

The following is an analysis of each field of the WaitGroup structure:

  • noCopy , noCopy is a technology used to detect prohibited copies in the source code of golang. If there is WaitGroup assignment behavior in the program, you will find an error when you use go vet to check the program. However, it should be noted that noCopy will not affect the normal compilation and operation of the program.

  • sema corresponds to the implementation of the semaphore inside runtime in golang. Two related functions of sema will be used in WaitGroup: runtime_Semacquire and runtime_Semrelease. The corresponding functions are as follows:

    • runtime_Semacquire means adding a semaphore and suspending the current goroutine;
    • runtime_Semrelease means reducing a semaphore and waking up one of the waiting goroutine on sema;
  • state, state is the state data field of WaitGroup, and is an unsigned 64 bit data, containing counter, waiter information

    • counter represents the number of items that have not yet been completed. WaitGroup.Add(n) will cause counter + = n , and WaitGroup.Done() will cause counter--;
    • waiter represents the number of goroutine that have called WaitGroup.Wait so far;

    The state field diagram is as follows:

    image-20230901095523719

The entire calling process of WaitGroup can be simply described as follows:

  1. When WaitGroup.Add(n) is called, counter will be incremented: counter + = n;

  2. When WaitGroup.Wait() is called, waiter + + will be called. At the same time, call runtime_Semacquire(semap), increase the semaphore, and suspend the current goroutine;

  3. When WaitGroup.Done() is called, there will be counter--. If the counter after decrement is equal to 0, it means that the waiting process of WaitGroup has ended, and you need to call runtime_Semrelease to release Semaphore, wake up goroutine that is WaitGroup.Wait;

WaitGroup calling process diagram:

image-20230901102657700

Source code analysis

WaitGroup.Add

The WaitGroup.Add function adds the number of goroutines to wait for in a WaitGroup. Each time the Add function is called, the counter value will be incremented, indicating how many concurrent operations need to be completed. This function is usually called before starting a concurrent operation to specify the number of tasks to wait for.

Go directly to the source code:

//go 1.20.3 path: sync/waitgroup.go

func (wg *WaitGroup) Add(delta int) {<!-- -->
...
//Add delta to the high 32 bits of state, leaving the low 32 bits unchanged. Counter plus delta
state := wg.state.Add(uint64(delta) << 32)
// Take out the high 32 bits of state, which is Counter
v := int32(state >> 32)
// Take out the lower 32 bits of state, i.e. Waiters
w := uint32(state)
...
// If Counter is less than 0, it means Counter overflows and panics directly.
if v < 0 {<!-- -->
panic("sync: negative WaitGroup counter")
}
// If Waiters is not 0, it means there is Wait waiting, but Counter is not 0, it means Add and Wait are being executed at the same time, panic
if w != 0 & amp; & amp; delta > 0 & amp; & amp; v == int32(delta) {<!-- -->
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// If Counter is greater than 0 or Waiters is 0, it means that Counter has not been reduced to 0, so there is no need to wait and return directly.
if v > 0 || w == 0 {<!-- -->
return
}

//If wg.state.Load() != state, it means Add and Wait are executed at the same time, panic
if wg.state.Load() != state {<!-- -->
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}

//Reset Waiters to 0
wg.state.Store(0)
//Call runtime_Semrelease to wake up and release all Waiters
for ; w != 0; w-- {<!-- -->
runtime_Semrelease( & amp;wg.sema, false, 0)
}
}

The code is very simple and the comments are clear, so I won’t explain it much here.

WaitGroup.Wait

The WaitGroup.Wait method is used to block the main program until the counter reaches zero, that is, all waiting tasks have been completed. This method is a synchronous operation, mainly used to ensure that all waiting goroutine have been executed before the main program can continue to perform subsequent operations.

Go directly to the source code:

//go 1.20.3 path: sync/waitgroup.go

func (wg *WaitGroup) Wait() {<!-- -->
...
//Loop and wait for counter to reach 0
for {<!-- -->
// read state
state := wg.state.Load()
// Take out the high 32 bits, which is the counter
v := int32(state >> 32)
// Take out the lower 32 bits, which is the waiters counter
w := uint32(state)
\t\t
// If the counter is 0, return directly
if v == 0 {<!-- -->
...
return
}
//If the counter is not 0, add 1 to the waiters counter
if wg.state.CompareAndSwap(state, state + 1) {<!-- -->
...
//Call the runtime_Semacquire function, which will add the current goroutine to the waiting queue
runtime_Semacquire( & amp;wg.sema)
//If the counter is not 0, panic
if wg.state.Load() != 0 {<!-- -->
panic("sync: WaitGroup is reused before previous Wait has returned")
}
...
return
}
}
}

The code is very simple and the comments are clear, so I won’t explain it much here.

WaitGroup.Done

The WaitGroup.Done method is used to decrement the counter value to indicate that a waiting task has been completed. Each waiting goroutine should call this method after completing its task to notify the WaitGroup counter to decrease by one. The main function is to signal notification after waiting for the task to be completed.

Go directly to the source code:

func (wg *WaitGroup) Done() {<!-- -->
wg.Add(-1)
}

This is just a simple encapsulation of WaitGroup.Add, please refer to WaitGroup.Add code comments.

Summary

  • WaitGroup can be used for one goroutine to wait for multiple goroutine to complete their work, or multiple goroutine to wait for one goroutine The work is completed, it is a many-to-many relationship
  • The Add(n>0) method should be called before starting the goroutine , and then the Done method is called inside the goroution
  • WaitGroup must be used again after the Wait method returns
  • Done is just a simple encapsulation of Add, so you can actually reduce calls by adding a relatively large value at a time, or achieve the purpose of quick wake-up.

refer to:

cyhone https://zhuanlan.zhihu.com/p/344973865

[mohuishou] (https://lailin.xyz/)https://lailin.xyz/post/go-training-week3-waitgroup.html