golang sync.RWMutex source code analysis

sync.RWMutex

Version: go1.20.3 darwin/arm64

There are four scenes below

  • How to prevent write operations?

Add mutex lock directly

  • How do write operations prevent read operations?

RWMutex.readerCount is an integer value used to represent the number of readers. Regardless of write operations, the value is + 1 for each read lock and – 1 for each read lock release.

When performing a write operation, RWMutex.readerCount will become a negative value

  • How do read operations prevent write operations?

Each time it is read, RWMutex.readerCount will be decremented by one. When RWMutex.readerCount becomes 0, the write operation will be performed.

  • Will write operations starve to death?

First, let’s explain why starvation may occur: the write operation must wait for the read operation to end before it can obtain the lock. During the waiting period of the write operation, new read operations may continue to arrive. If the write operation waits for all read operations to end, it is likely that Starved to death.

When a write operation comes, the RWMutex.readerCount value will be copied to RWMutex.readerWait, and then each read operation RWMutex.readerCount and RWMutex.readerWait will be reduced by 1. When readerwait=0, the writing operation starts.

Analysis

type RWMutex struct {<!-- -->
w Mutex // Mutex lock
writerSem uint32 // The semaphore where the write operation waits for the read operation to complete
readerSem uint32 // The semaphore where the read operation waits for the write operation to complete
readerCount atomic.Int32 //Read lock counter
readerWait atomic.Int32 //The current number of read lock releases that need to be waited for when acquiring the write lock
}

// Only supports a maximum of 1 << 30 read locks
const rwmutexMaxReaders = 1 << 30

Semaphore

  • Acquire (also known as wait, decrement or P)
  • Release (also known as signal, increment or V)

The acquisition operation decrements the semaphore by one. If the result of the decrement is non-negative, the thread can continue execution. If the result is a negative number, the thread will be blocked until another thread increases the semaphore back to a non-negative number before the thread can resume running).

The release operation increments the semaphore by one. If there are currently blocked threads, one of them will be awakened and execution will resume.

The runtime of the Go language provides the runtime_SemacquireMutex and runtime_Semrelease functions, which are used in the implementation of objects like sync.RWMutex.

Write lock and lock Lock()

func (rw *RWMutex) Lock() {<!-- -->
    // Race detection
    if race.Enabled {<!-- -->
        _ = rw.w.state
        race.Disable()
    }
    // 1. Use Mutex lock to resolve competition with other writers
    rw.w.Lock()
    
    // 2. Determine whether there is currently a read lock: first change readerCount (readerCount-rwmutexMaxReaders) through atomic operations,
    // Make it a negative number to tell RUnLock that there is currently a write lock waiting;
    // Then add back rwmutexMaxReaders and assign it to r. If r is still not 0, it means there is still a read lock.
    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
    
    // 3. If there are still other Goroutines holding the read lock of the mutex (r != 0)
    // The value of readerCount will be added to readerWait first to prevent a steady stream of readers from coming in and starving the write lock.
    //Then the Goroutine will call sync.runtime_SemacquireMutex to enter sleep state,
    //And wait for all read lock owners to release the writerSem semaphore to wake up the current coroutine.
  if r != 0 & amp; & amp; rw.readerWait.Add(r) != 0 {<!-- -->
runtime_SemacquireRWMutex( & amp;rw.writerSem, false, 0)
}
    // Race detection
    if race.Enabled {<!-- -->
        race.Enable()
        race.Acquire(unsafe.Pointer( & amp;rw.readerSem))
        race.Acquire(unsafe.Pointer( & amp;rw.writerSem))
    }
}

First, a mutex lock will be added, and then it will be checked to see if there are other read locks. The check method is the readerCount atomic operation – rwmutexMaxReaders. At this time, the readerCount becomes negative to prevent writing, and then the value is filled back + rwmutexMaxReaders to get r. If r! =0, indicating that there are other read locks at this time, so in order to prevent the write lock from starving to death, use the atomic operation + r for waitCount, and then call the semaphore to sleep the goroutine

Write lock release UnLock()

func (rw *RWMutex) Unlock() {<!-- -->
  // Race detection
if race.Enabled {<!-- -->
_ = rw.w.state
race.Release(unsafe.Pointer( & amp;rw.readerSem))
race.Disable()
}

// Announce to readers there is no active writer.
  // r is now the number of read locks
r := rw.readerCount.Add(rwmutexMaxReaders)
  // If the maximum limit of the read lock is exceeded, panic is triggered.
if r >= rwmutexMaxReaders {<!-- -->
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
  // Call the semaphore one by one to wake up the goroutine
for i := 0; i < int(r); i + + {<!-- -->
runtime_Semrelease( & amp;rw.readerSem, false, 0)
}
// Allow other writers to proceed.
  // Unlock
rw.w.Unlock()
  // Race detection
if race.Enabled {<!-- -->
race.Enable()
}
}

There’s nothing to say, just read the comments

Just add the max value to the readerCount, then loop through the semaphore to wake up the coroutine, and finally release the mutex lock

Read lock and lock RLock()

func (rw *RWMutex) RLock() {<!-- -->
  // Whether to enable detection of race
if race.Enabled {<!-- -->
_ = rw.w.state
race.Disable()
}
  //There are two situations here:
// 1. There is no write lock at this time (readerCount + 1) > 0, then the read lock can be applied, and the readerCount is atomically increased by 1 (the read lock can be reentrant [as long as the number of releases matches])
// 2. There is a write lock at this time (readerCount + 1) < 0, so the semaphore is read through readerSem, making the read operation sleep and wait.
if rw.readerCount.Add(1) < 0 {<!-- -->
// A writer is pending, wait for it.
    // There is currently a write lock, and the read operation needs to block waiting for the write lock to be released;
    //In fact, what we do is to queue goroutine to the back of G queue and suspend goroutine
runtime_SemacquireRWMutexR( & amp;rw.readerSem, false, 0)
}
if race.Enabled {<!-- -->
race.Enable()
race.Acquire(unsafe.Pointer( & amp;rw.readerSem))
}
}

Direct readerCount + 1, then <0 indicates that there is a write lock at this time, and calls the semaphore sleep

Read lock release RUnlock()

func (rw *RWMutex) RUnlock() {<!-- -->
if race.Enabled {<!-- -->
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer( & amp;rw.writerSem))
race.Disable()
}
  //Write the lock waiting state to check whether it can be acquired currently;
    // First decrement readerCount by 1 and assign it to r, and then judge it in two situations
    // 1. If r is greater than or equal to 0, the read lock is directly unlocked successfully and this operation ends directly;
    // 2. If r is less than 0, there is a write operation being executed, and the sync.RWMutex.rUnlockSlow method will be called at this time;
if r := rw.readerCount.Add(-1); r < 0 {<!-- -->
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {<!-- -->
race.Enable()
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {<!-- -->
      // r + 1 == 0 means there is no read lock, and RUnlock() is executed directly
    // r + 1 == -rwmutexMaxReaders means executing Lock() and then RUnlock()
if r + 1 == 0 || r + 1 == -rwmutexMaxReaders {<!-- -->
race.Enable()
fatal("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
  // If there is currently a write lock waiting, reduce the number of readerWait by one
if rw.readerWait.Add(-1) == 0 {<!-- -->
// The last reader unblocks the writer.
runtime_Semrelease( & amp;rw.writerSem, false, 1)
}
}

First check whether RUnlock() is executed directly | Execute Lock() and then RUnlock(), then give waitCount-1 and compare it with 0. If == 0, call the semaphore to wake up the goroutine

Reference

https://dongxiem.github.io/2020/06/07/golang-sync-bao-yuan-ma-pou-xi-2-sync.rwmutex/