Buffer channel and limit the number of goroutine concurrency

Article directory

            • 3.7.1 Used as counting semaphore
            • 3.7.2 Use cache channel + sync.WaitGroup to limit the number of concurrencies (similar to the previous section)

To limit the concurrency of goroutine,
Be sure to block the main goroutine!
Be sure to block the main goroutine!
Be sure to block the main goroutine!
See the last example.

Since the runtime layer of the buffered channel is implemented with a buffer, so the sending operation on the buffered channel is asynchronous when the buffer is not full and the receiving operation is not empty (sending or receiving does not need to block) wait).

By default, non-buffered channels are created, and reads and writes are blocked immediately. The buffer channel comes with a buffer that can temporarily store data. If the buffer is full, blocking will occur.

  • For a buffered channel, when there is no data in the buffer or there is data but not full, the goroutine that sends it will not block;
  • When the buffer is full, ** the goroutine that performs the sending operation will block; ** when the buffer is empty, the goroutine that performs the receiving operation will also block. will block.

Remember that the buffer channel blocks the goroutine of the receive or send operation.

The following uses cases to compare buffered channels and non-buffered channels.

package main

import (
"fmt"
"time"
)

func main() {<!-- -->
//1. Non-buffered channel
ch1 := make(chan int)
fmt.Println("non-buffered channel", len(ch1), cap(ch1)) //non-buffered channel 0 0
go func() {<!-- -->
data := <-ch1
fmt.Println("Get data", data) //Get data 100
}()
ch1 <- 100
time.Sleep(time.Second)
fmt.Println("Assignment ok", "main over...")

//2. Non-buffered channel
ch2 := make(chan string)
go sendData(ch2)
for data := range ch2 {<!-- -->
fmt.Println("\t read data", data)
}
fmt.Println("main over...ch2")

//3. Buffer channel, it will block only when the buffer is full.
ch3 := make(chan string, 6)
go sendData(ch3)
for data := range ch3 {<!-- -->
fmt.Println("ch3 \tRead data", data)
}
fmt.Println("main over...ch3")
}

func sendData(ch chan string) {<!-- -->
for i := 1; i <= 3; i + + {<!-- -->
ch <- fmt.Sprintf("data%d", i)
fmt.Println("Put data to the channel:", i)
}
defer close(ch)
}

Output:

$ go run .\main.go
Unbuffered channel 0 0
Get data 100
Assignment ok main over...
Put data into channel: 1
         Read data data1
         Read data data2
Put data into channel: 2
Put data into channel: 3
         Read data data3
main over...ch2
Put data into channel: 1
Put data into channel: 2
Put data into channel: 3
ch3 reads data data1
ch3 reads data data2
ch3 reads data data3
main over...ch3

The printing result of the non-buffered channel part alternates between input data and received data, which shows that both reading and writing are blocked immediately. The received data is printed after the input data in the buffered channel part is printed, which means that it is non-blocking when the buffer is not full.

Producers and consumers can be simulated using buffered channels.

Whether it is a single receive and a single transmit or multiple receive and multiple transmit, the transceiver performance of buffered channel is better than that of unbuffered channel; for buffered channel, choosing the appropriate capacity will improve the transceiver performance to a certain extent.

3.7.1 Used as counting semaphore

An idiom in Go’s concurrency design is to use buffered channels as counting semaphore.

The current number of data in the buffered channel represents the number of goroutines that are currently active (processing business) at the same time, and the capacity of the buffered channel represents the maximum number of goroutines that are allowed to be active at the same time. A send operation to a buffered channel represents acquiring a semaphore slot, while a receive operation from a buffered channel represents releasing a semaphore slot.

Here is an example of using a buffered channel as a counting semaphore:

// chapter6/sources/go-channel-case-7.go
var active = make(chan struct{<!-- -->}, 3)
var jobs = make(chan int, 10)

func main() {<!-- -->
    go func() {<!-- -->
        for i := 0; i < 8; i + + {<!-- -->
            jobs <- (i + 1)
        }
        close(jobs)
    }()
    
    var wg sync.WaitGroup
    
    for j := range jobs {<!-- -->
        wg.Add(1)
        go func(j int) {<!-- -->
            active <- struct{<!-- -->}{<!-- -->}
            log.Printf("handle job: %d\\
", j)
            time.Sleep(2 * time.Second)
            <-active
            wg.Done()
        }(j)
    }
    wg.Wait()
}

The above example creates a group of goroutines to handle the job, allowing up to 3 goroutines to be active at the same time. To achieve this goal, the example uses a buffered channel with a capacity of 3 and active as the counting semaphore, which means that the maximum number of goroutines allowed to be active at the same time is 3. Let’s run the example:

$go run go-channel-case-7.go
2020/02/04 09:57:02 handle job: 8
2020/02/04 09:57:02 handle job: 4
2020/02/04 09:57:02 handle job: 1
2020/02/04 09:57:04 handle job: 2
2020/02/04 09:57:04 handle job: 3
2020/02/04 09:57:04 handle job: 7
2020/02/04 09:57:06 handle job: 6
2020/02/04 09:57:06 handle job: 5

You can see from the timestamp in the example running result: Although many goroutines are created, due to the existence of counting semaphores, there are at most 3 goroutines processing active status (processing jobs) at the same time.

3.7.2 Use cache channel + sync.WaitGroup to limit the number of concurrencies (similar to the previous section)
package main

import (
"fmt"
"runtime"
"sync"
"time"
)

func main() {<!-- -->
run([]int{<!-- -->1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, 3)
}

func run(dataList []int, limit int) {<!-- -->
var wg sync.WaitGroup

ch := make(chan struct{<!-- -->}, limit)
for index, ele := range dataList {<!-- -->
wg.Add(1)

fmt.Printf("% + v\\
", & amp;wg)
go func(ele int, index int) {<!-- -->
fmt.Println("for index:", index)
fmt.Println("nub:", runtime.NumGoroutine())
ch <- struct{<!-- -->}{<!-- -->}
fmt.Println("start task:", ele)
time.Sleep(10 * time.Second)
fmt.Println("end task:", ele)
<-ch
wg.Done()
}(ele,index)

}
wg.Wait()
fmt.Println("111111111111111111")
}

Output:

C:\Users\Administrator\Desktop\doc\shell\docker and k8s\project\gotnet>go run tests/main.go
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:4294967296} sema:0}
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:8589934592} sema:0}
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:12884901888} sema:0}
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:17179869184} sema:0}
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:21474836480} sema:0}
for index: 0
nub: 6
start task: 1
for index: 3
nub: 6
start task: 4
for index: 2
nub: 6
start task: 3
for index: 4
nub: 6
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:25769803776} sema:0}
for index: 1
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:30064771072} sema:0}
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:34359738368} sema:0}
nub: 7
for index: 5
nub: 9
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:38654705664} sema:0}
for index: 7
nub: 10
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:42949672960} sema:0}
for index: 8
nub: 11
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:47244640256} sema:0}
for index: 9
nub: 12
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:51539607552} sema:0}
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:55834574848} sema:0}
 & amp;{<!-- -->noCopy:{<!-- -->} state:{<!-- -->_:{<!-- -->} _:{<!-- -->} v:60129542144} sema:0}
for index: 13
nub: 15
for index: 6
nub: 15
for index: 10
nub: 15
for index: 12
nub: 15
for index: 11
nub: 15
end task: 3
start task: 5
end task: 1
start task: 2
end task: 4
start task: 6
end task: 2
start task: 8
end task: 6
start task: 9
end task: 5
start task: 10
end task: 10
start task: 14
end task: 9
start task: 7
end task: 8
start task: 11
end task: 11
start task: 13
end task: 7
end task: 14
start task: 12
end task: 12
end task: 13
111111111111111111

There are groups of 3, and then there are groups of 2 left. When all is completed, the main process ends.

But the process may be different from what most people think. Take a look as follows:

  • Only the following information appears in 0-10 seconds: It can be seen that the for loop is completed at the beginning, and 14 goroutines are created, plus a main goroutine. 14 + 1

    C:\Users\Administrator\Desktop\doc\shell\docker and k8s\Project\gotnet>go run tests/main.go
     & amp;{noCopy:{} state:{_:{} _:{} v:4294967296} sema:0}
     & amp;{noCopy:{} state:{_:{} _:{} v:8589934592} sema:0}
     & amp;{noCopy:{} state:{_:{} _:{} v:12884901888} sema:0}
     & amp;{noCopy:{} state:{_:{} _:{} v:17179869184} sema:0}
     & amp;{noCopy:{} state:{_:{} _:{} v:21474836480} sema:0}
    for index: 0
    nub: 6
    start task: 1
    for index: 3
    nub: 6
    start task: 4
    for index: 2
    nub: 6
    start task: 3
    for index: 4
    nub: 6
     & amp;{noCopy:{} state:{_:{} _:{} v:25769803776} sema:0}
    for index: 1
     & amp;{noCopy:{} state:{_:{} _:{} v:30064771072} sema:0}
     & amp;{noCopy:{} state:{_:{} _:{} v:34359738368} sema:0}
    nub: 7
    for index: 5
    nub: 9
     & amp;{noCopy:{} state:{_:{} _:{} v:38654705664} sema:0}
    for index: 7
    nub: 10
     & amp;{noCopy:{} state:{_:{} _:{} v:42949672960} sema:0}
    for index: 8
    nub: 11
     & amp;{noCopy:{} state:{_:{} _:{} v:47244640256} sema:0}
    for index: 9
    nub: 12
     & amp;{noCopy:{} state:{_:{} _:{} v:51539607552} sema:0}
     & amp;{noCopy:{} state:{_:{} _:{} v:55834574848} sema:0}
     & amp;{noCopy:{} state:{_:{} _:{} v:60129542144} sema:0}
    for index: 13
    nub: 15
    for index: 6
    nub: 15
    for index: 10
    nub: 15
    for index: 12
    nub: 15
    for index: 11
    nub: 15
    

    So many goroutines have been created, but because the capacity of the channel is only 3, other goroutines that want to send channels are blocked.

So how to limit the number of goroutines? The key point is that in addition to the buffered channel, the main goroutine must also be blocked. At this time, the sending and receiving of the channel need to be separated.

package main

import (
"fmt"
"runtime"
"sync"
"time"
)

func main() {<!-- -->
run([]int{<!-- -->1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, 3)

}

func run(dataList []int, limit int) {<!-- -->
var wg sync.WaitGroup

ch := make(chan struct{<!-- -->}, limit)
for index, ele := range dataList {<!-- -->
wg.Add(1)
        ch <- struct{<!-- -->}{<!-- -->} //Key code, do not put this line in go func(). At this time, because ch is full, main is also blocked. The for loop will not continue to create goroutine.
fmt.Printf("% + v\\
", & amp;wg)
go func(ele int, index int) {<!-- -->
fmt.Println("for index:", index)
fmt.Println("nub:", runtime.NumGoroutine())

fmt.Println("start task:", ele)
time.Sleep(10 * time.Second)
fmt.Println("end task:", ele)
<-ch
wg.Done()
}(ele,index)

}
wg.Wait()
fmt.Println("111111111111111111")
}

The key point is to block main, so main must participate in sending or receiving. Take advantage of the features of buffered channels.

The buffer channel comes with a buffer that can temporarily store data. If the buffer is full, blocking will occur.

  • For a buffered channel, when there is no data in the buffer or there is data but not full, the goroutine that sends it will not block;
  • When the buffer is full, ** the goroutine that performs the sending operation will block; ** when the buffer is empty, the goroutine that performs the receiving operation will also block. will block.