Go coroutines: channels/channels, mutexes, and read-write locks

Directory

  • channel/channel
    • 1. Definition and use of channels
    • 2. Channel capacity and length
    • 3. Two-way channel and one-way channel
      • bidirectional channel
      • one-way channel
    • 4. Traverse the channel (Fibonacci sequence effect)
    • 5. Use channels as locks
    • 6 Is channel transfer a deep copy?
    • 7 things to note
    • 8 Classic error cases of channel deadlock
      • 1 Unbuffered channel, sending operation is blocked before receiving
      • 1.1 The first method
      • 1.2 The second method:
      • 2 The channel capacity is 1, but writing two pieces of data to the channel will cause a deadlock for a coroutine.
      • 2.1 When the program has been waiting to read data from the channel, no one will write data to the channel at this time. At this point, the program will fall into an infinite loop, causing a deadlock.
  • sync.WaitGroup synchronization tool, waiting for a group of Goroutines to complete their work
  • Locking mechanism in Golang
    • 1. Mutex lock: Mutex
    • 2 Read-write lock: RWMutex

Channel/Channel

If goroutine is the concurrency of Go language programs, then channel is the communication mechanism between them. Channel is a channel that allows one goroutine to transmit information to another goroutine.

A channel is a pipe that connects multiple goroutine programs. It is a queue-type data structure that follows the first-in, first-out rule.

1. Definition and use of channels

Each channel can only pass data of one data type, so when you declare it, you have to specify the data type (string int, etc.)

var channel instance chan channel type

The zero value of the declared channel is nil and cannot be used directly. It must be initialized with the make function.

Channel instance = make(chan channel type)

The above two lines can be combined into one sentence. I use this method to declare the channel below:
Channel instance := make(chan channel type)

There are only two types of data operations on the channel: sending data and reading data:

//Define channel
pipline := make(chan int)

//Send data to the channel
pipeline<- 200

// Get data from the channel and assign it to mydata
mydata := <-pipline

When the channel is used up, it can be closed to prevent someone from waiting all the time.
But after you close the channel, the receiver can still get data from the channel, but the received data will always be 0.

close(pipline)

If you close a closed channel again, an error will be reported.
So we still need to learn how to judge whether a channel is closed?

When reading data from the channel, there can be multiple return values, the second of which can indicate whether the channel is closed. If it has been closed, ok is false. If it has not been closed, ok is true.

x, ok := <-pipline

Buffered channels are asynchronous and unbuffered channels are synchronous.

2. Channel capacity and length

A channel is a container

Generally, the make function is used to create a channel. The make function receives two parameters.

The first parameter: required, specifies the channel type
The second parameter: optional, defaults to 0 if not filled in, specifies the capacity of the channel (how much data can be cached)

  • When the capacity is 0, it means that data cannot be stored in the channel. When sending data, someone must receive it immediately, otherwise an error will be reported. The channel at this time is called an unbuffered channel.
  • When the capacity is 1, it means that the channel can only cache one data. If there is already one data in the channel, sending data to it at this time will cause the program to block. Using this, you can use the channel to do the lock.
  • When the capacity is greater than 1, multiple data can be stored in the channel and can be used as a communication channel between multiple coroutines to share resources.
package main

import "fmt"

func main() {<!-- -->
    pipline := make(chan int, 10)
    fmt.Printf("Channel can buffer %d data\
", cap(pipline))
    pipeline<- 1
    fmt.Printf("There are currently %d data in the channel", len(pipline))
}

The capacity of the channel can be obtained using the cap function, and the length of the channel can be obtained using the len length.

The channel can buffer 10 data
There is currently 1 data in the channel

3. Two-way channel and one-way channel

The channel can only receive data or this channel can only send data.
Therefore, there are two categories: bidirectional channel and unidirectional channel.

Two-way channel

package main

import (
"fmt"
"time"
)

func main() {<!-- -->
pipline := make(chan int)

go func() {<!-- -->
fmt.Println("Ready to send data: 100")
pipeline <- 100
}()

go func() {<!-- -->
num := <-pipline
fmt.Printf("The received data is: %d", num)
}()
// The main function sleeps so that the above two goroutines have the opportunity to execute
time.Sleep(time.Second)
}

One-way channel

One-way channels can be subdivided into read-only channels and write-only channels.

Define read-only channel

var pipline = make(chan int)
type Receiver = <-chan int // Key code: define alias type
var receiver Receiver = pipline

Define write-only channel

var pipline = make(chan int)
type Sender = chan<- int // Key code: define alias type
var sender Sender = pipline
package main

import (
"fmt"
"time"
)

//Define write-only channel type
type Sender = chan<- int

//Define read-only channel type
typeReceiver = <-chan int

func main() {<!-- -->
var pipeline = make(chan int)

go func() {<!-- -->
var sender Sender = pipline
fmt.Println("Ready to send data: 100")
sender <- 100
}()

go func() {<!-- -->
var receiver Receiver = pipline
num := <-receiver
fmt.Printf("The received data is: %d", num)
}()
// The main function sleeps so that the above two goroutines have the opportunity to execute
time.Sleep(time.Second)
}

4. Traverse channels (Fibonacci sequence effect)

To traverse the channel, you can use for with the range keyword. When using range, make sure the channel is closed, otherwise the loop will block.

package main

import "fmt"

func fibonacci(mychan chan int) {<!-- -->
n := cap(mychan)
x, y := 1, 1
for i := 0; i < n; i + + {<!-- -->
mychan <- x
x, y = y, x + y
}
// Remember to close the channel
// Otherwise, the traversal in the main function will not end, but will block.
close(mychan)
}

func main() {<!-- -->
pipline := make(chan int, 10)

go fibonacci(pipline)

for k := range pipline {<!-- -->
fmt.Println(k)
}
}

5. Use channels as locks

When the amount of data in the channel has reached the set capacity, sending data there again will block the entire program.

Taking advantage of this feature, you can use it as a program lock.

The example is as follows, please see the comments for details

package main

import (
"fmt"
"time"
)

// Since x=x + 1 is not an atomic operation
// Therefore, multiple coroutines should be avoided to operate on x.
//Use a channel with a capacity of 1 to achieve the lock effect
func increment(ch chan bool, x *int) {<!-- -->
ch <- true
*x = *x + 1
<-ch
}

func main() {<!-- -->
// Note that you need to set a buffer channel with a capacity of 1
pipline := make(chan bool, 1)

var x int
for i := 0; i < 1000; i + + {<!-- -->
go increment(pipline, & amp;x)
}

// Make sure all coroutines have completed
// A more appropriate method (Mutex) will be introduced in the future. Sleep is used for the time being.
time.Sleep(time.Second)
fmt.Println("value of x: ", x)
}

The output is as follows: The value of x: 1000
If not locked, the output will be less than 1000.

6 Is channel transfer a deep copy?

Data structures can be divided into two types:

Value type: String, Array, Int, Struct, Float, Bool

Reference types: Slice, Map

The answer is: Whether it is a deep copy depends on whether the value you pass in is a value type or a reference type?

7 Several things to note

1 Closing an uninitialized channel will cause a panic
2 Closing the same channel repeatedly will cause panic
3 Sending a message to a closed channel will cause a panic
4 Reading messages from a closed channel will not cause panic, and messages that have not yet been read in the channel can be read. If all messages have been read, a zero value of this type will be read.

5 Reading messages from a closed channel will never block, and will return a value of false to determine whether the channel is closed (x,ok := <- ch)

6 Closing the channel will generate a broadcast mechanism, and all goroutines that read messages from the channel will receive the messages.

7 Channel is a first-class citizen in Golang. It is thread-safe. When facing concurrency issues, channel should be the first thing to think of.

8 Classic error cases of channel deadlock

1 Unbuffered channel, the sending operation is blocked before receiving the receiver

package main

import "fmt"

func main() {<!-- -->
    pipline := make(chan string)
    pipline <- "hello world"
    fmt.Println(<-pipline)
}
Run will throw an error, as follows
fatal error: all goroutines are asleep - deadlock!

It seems like there is no problem? First store data into the channel, and then read data from the channel.

Looking back at the previous foundation, we know that when using make to create a channel, if you do not pass the second parameter, you are defining an unbuffered channel. For unbuffered channels, the sending operation is blocked until the receiver is ready. .

Therefore, there are two ways to solve this problem:

1 Make the receiver code execute before the sender
2 Use buffered channels instead of unbuffered channels

1.1 The first method

package main

import "fmt"

func main() {<!-- -->
    pipline := make(chan string)
    fmt.Println(<-pipline)
    pipline <- "hello world"
}

I still get the same error when running. What's the problem?

It turns out that we wrote the sender and receiver in the same coroutine. Although the receiver code is guaranteed to be executed before the sender, because the previous receiver has been waiting for data and is in a blocked state, it cannot execute the subsequent data sending. . Still causing a deadlock.

With the previous experience, we write the receiver code in another coroutine and ensure that it is executed before the sender, like this code:

package main

func hello(pipline chan string) {<!-- -->
<-pipline
}

func main() {<!-- -->
pipline := make(chan string)
go hello(pipline)
pipline <- "hello world"
}

After running, everything is normal.

1.2 Second method:

The receiver code must be executed before the sender code, which is a constraint only for unbuffered channels.

In this case, wouldn't it be OK if we switch to using bufferable channels?

package main

import "fmt"

func main() {<!-- -->
    pipline := make(chan string, 1)
    pipline <- "hello world"
    fmt.Println(<-pipline)
}

After running it, everything works fine.

2 The channel capacity is 1, but writing two pieces of data to the channel will cause a deadlock for a coroutine

Each buffer channel has a capacity. When the amount of data in the channel is equal to the capacity of the channel, sending data to the channel will cause congestion. The program will not continue until someone consumes data from the channel. conduct.

For example, in this code, the channel capacity is 1, but writing two pieces of data to the channel will cause a deadlock for a coroutine.

package main

import "fmt"

func main() {<!-- -->
    ch1 := make(chan string, 1)

    ch1 <- "hello world"
    ch1 <- "hello China"

    fmt.Println(<-ch1)
}

2.1 When the program has been waiting to read data from the channel, no one will write data to the channel at this time. At this point, the program will fall into an infinite loop, causing a deadlock

For example, in this code, after the for loop receives two messages ("hello world" and "hello China"), no one will send data anymore, and the receiver will be in an embarrassing situation of waiting and never receiving data.

Falling into an infinite loop, causing a deadlock.

package main

import "fmt"

func main() {<!-- -->
    pipline := make(chan string)
    go func() {<!-- -->
        pipline <- "hello world"
        pipline <- "hello China"
        // close(pipline)
    }()
    for data := range pipline{<!-- -->
        fmt.Println(data)
    }
}

So the solution to this problem is very simple. Just close the channel manually after sending the data and tell the range that the channel has been closed without waiting.

package main

import "fmt"

func main() {<!-- -->
    pipline := make(chan string)
    go func() {<!-- -->
        pipline <- "hello world"
        pipline <- "hello China"
        close(pipline)
    }()
    for data := range pipline{<!-- -->
        fmt.Println(data)
    }
}

sync.WaitGroup synchronization tool, waiting for a group of Goroutines to complete their work

sync.WaitGroup is a synchronization tool in the Go language, usually used to wait for a group of Goroutines to complete their work. WaitGroup provides a way to wait for a group of Goroutines to complete before continuing with other tasks.

The main methods of sync.WaitGroup are as follows:

  1. Add(int): Used to add one or more waiting Goroutines to WaitGroup. Each call to the Add method will increase the number of waiting Goroutines.

  2. Done(): Used to notify WaitGroup that a Goroutine has completed its work. Each call to the Done method will reduce the number of waiting Goroutines.

  3. Wait(): Used to block the current Goroutine until the counter in WaitGroup drops to zero. Once the counter reaches zero, the Wait method returns, allowing program execution to continue.

Here is a simple example of how to use sync.WaitGroup in Go to wait for a group of Goroutines to complete:

package main

import (
"fmt"
"sync"
)

func worker(id int, wg *sync.WaitGroup) {<!-- -->
defer wg.Done()
fmt.Printf("Worker %d is done\
", id)
}

func main() {<!-- -->
var wg sync.WaitGroup

numWorkers := 5

for i := 0; i < numWorkers; i + + {<!-- -->
wg.Add(1)
go worker(i, & amp;wg)
}

// Wait for all Goroutines to complete
wg.Wait()

fmt.Println("All workers have completed.")
}

In the above example, WaitGroup is used to wait for numWorkers Goroutines to complete their work. Each Goroutine calls Done to notify WaitGroup that it has completed. Finally, the Wait method will wait for all Goroutines to complete before printing "All workers have completed.".

This is a common pattern used by sync.WaitGroup in Go to coordinate concurrent operations, especially useful when you need to wait for multiple Goroutines to complete before performing other operations.

Lock mechanism in Golang

The sync package has two very important lock types.

  • One is called Mutex, which can be used to implement mutex locks.
  • One is called RWMutex, which can be used to implement read-write locks.

1. Mutex lock: Mutex

Mutex locks are used to protect a resource from conflicts caused by concurrent operations, resulting in inaccurate data.

Although using Mutext locks is very simple, there are still a few points to note:

In the same coroutine, do not lock again before unlocking
In the same coroutine, do not unlock the unlocked lock again.
After adding the lock, don’t forget to unlock it and use the defer statement if necessary

package main

import (
    "fmt"
    "sync"
)

func add(count *int, wg *sync.WaitGroup) {<!-- -->
    for i := 0; i < 1000; i + + {<!-- -->
        *count = *count + 1
    }
    wg.Done()
}

func main() {<!-- -->
    var wg sync.WaitGroup
    count := 0
    wg.Add(3)
    go add( & amp;count, & amp;wg)
    go add( & amp;count, & amp;wg)
    go add( & amp;count, & amp;wg)

    wg.Wait()
    fmt.Println("The value of count is: ", count)
}
// first time
The value of count is: 2854

// the second time
The value of count is: 2673

// the third time
The value of count is: 2840

The reason is that when these three coroutines are executed, they first read count and then update the value of count. This process is not atomic, which leads to inaccurate data.

The way to solve this problem is to add a Mutex mutex to the add function, which requires that only one coroutine can operate count at the same time.

//The first type
var lock *sync.Mutex
lock = new(sync.Mutex)

// second type
lock := & amp;sync.Mutex{<!-- -->}
package main

import (
"fmt"
"sync"
)

func add(count *int, wg *sync.WaitGroup, lock *sync.Mutex) {<!-- -->
for i := 0; i < 1000; i + + {<!-- -->
lock.Lock()
*count = *count + 1
lock.Unlock()
}
wg.Done()
}

func main() {<!-- -->
var wg sync.WaitGroup
lock := & amp;sync.Mutex{<!-- -->}
count := 0
wg.Add(3)
go add( & amp;count, & amp;wg, lock)
go add( & amp;count, & amp;wg, lock)
go add( & amp;count, & amp;wg, lock)

wg.Wait()
fmt.Println("The value of count is: ", count)
}

2 Read-write lock: RWMutex

The idea of Mutex (mutex lock) can only serve one data at the same time.

The same is true for RWMutex, which divides the program's access to resources into read operations and write operations.

  • In order to ensure the security of the data, it stipulates that when someone is still reading the data (that is, the read lock is occupied), no one is allowed to update the data (that is, the write lock will block)
  • In order to ensure the efficiency of the program, when multiple people (threads) read data (have read locks), they will not affect each other and will not cause blocking. It will not allow only one person (thread) to read the same data like Mutex.

There are two ways to define a RWMuteux lock

//The first type
var lock *sync.RWMutex
lock = new(sync.RWMutex)

// second type
lock := & amp;sync.RWMutex{<!-- -->}

RWMutex provides two types of locks. Each lock corresponds to two methods. In order to avoid deadlock, the two methods should appear in pairs. Please use defer if necessary.

  • Read lock: call RLock method to open the lock, call RUnlock to release the lock
  • Write lock: call the Lock method to open the lock, call Unlock to release the lock (similar to Mutex)

Next, let’s look directly at the example

package main

import (
"fmt"
"sync"
"time"
)

func main() {<!-- -->
lock := & amp;sync.RWMutex{<!-- -->}
lock.Lock()

for i := 0; i < 4; i + + {<!-- -->
go func(i int) {<!-- -->
fmt.Printf("The %d coroutine is ready to start...\
", i)
lock.RLock()
fmt.Printf("The %dth coroutine obtains the read lock and releases the lock after sleeping for 1s\
", i)
time.Sleep(time.Second)
lock.RUnlock()
}(i)
}

time.Sleep(time.Second * 2)

fmt.Println("Ready to release the write lock, the read lock will no longer block")
// Once the write lock is released, the read lock is free
lock.Unlock()

// Because it will wait until all read locks are released before the write lock can be obtained
// Because all four coroutines above must be completed here before we can go down.
lock.Lock()
fmt.Println("Program exits...")
lock.Unlock()
}