Personal interpretation of distributed lock source code implemented by etcd

1 Prerequisite introduction

1.1 Distributed lock type

  1. Active polling type: This model is somewhat similar to the active polling + cas optimistic locking model in go’s sync.Mutex. The lock-retrieving party will continue to attempt to acquire the distributed lock. If the lock is already occupied, it will continue to do so. Initiate retries until the lock is successfully retrieved
  2. Watch callback type: When the lock acquirer discovers that the lock is occupied by others, it will create a watcher monitor to subscribe to the lock release event, and then no longer initiate an active attempt to acquire the lock. When the lock is released, the lock acquirer can pass the previously created watcher. Perceive this change, and then reinitiate the attempt to acquire the lock.

Personal opinion: In a distributed scenario, I prefer the watch callback implementation strategy. Because in a distributed scenario, the attempt to actively poll may be behind one or even multiple network IO requests, which is a bit costly. In this case, the lock-retrieval party will obtain the lock based on the watch callback, ensuring that the lock is Release, the request to acquire the lock will be retried only when it has a chance to acquire the lock, thus avoiding the loss of meaningless attempts to a large extent. (Purely personal opinion)

This is not to say that there are no benefits to being proactive. Active polling distributed locks can ensure that the user always takes the initiative in the process, and the entire process can be more lightweight and flexible. In addition, the watch mechanism needs to establish a long connection during the implementation process to complete the watch monitoring action. There will also be a certain amount of resource consumption. Therefore, I think which one is better depends on the specific scenario. For example, in scenarios with high concurrency intensity, it may be better to use watch callback distributed locks, and conversely, active round-robin distributed locks.

1.2 Watch callback implementation ideas

  • The same distributed lock is identified by the same data (such as a unique and clear key)
  • If the piece of data is successfully inserted into the storage medium (the key did not exist before), it means the locking is successful. If the insertion fails, it means that the lock is already held by someone else, and then listens for the deletion event of this data. When the deletion event occurs, it means that the lock has been released, and then the data will be re-attempted to be inserted (locked).
  • Delete data directly after unlocking

Thinging herd effect: Also known as the herd effect, a herd is an organization with poor discipline and usually moves in a disorganized and disorderly manner. It should be noted that once a certain sheep moves in the flock, the other sheep will rush up and run without thinking, completely ignoring objective issues such as there may be wolves nearby or where there is better grassland.

Watch callback distributed locks may also have this situation. For example, after multiple coroutines trying to lock fail, they monitor the deletion event of this data. That is, if the competition for this distributed lock is fierce, there may be multiple lock-takers monitoring the release event of the same lock at the same time. At this time, if the lock If it is released, all the lock-getting parties will swarm up to try to get the lock, which causes the “thundering herd problem”. Therefore, there will be a lot of meaningless performance loss in this process, and the request traffic may surge at the moment when the lock is released. It will have a negative effect on system stability. (Solutions are introduced below)

1.3 etcd

etcd official documentation: etcd

etcd is a distributed kv storage component suitable for shared configuration and service discovery. The underlying distributed consensus algorithm raft protocol ensures strong consistency and high availability of storage services. etcd provides the watch listener function, and continuously monitors change events by creating a grpc long connection through the etcd server node for data in a specified range. In addition, when writing data in etcd, it also supports the coordination of lock acquisition order through the revision mechanism. It is a component that is very suitable for implementing distributed locks.

1.4 sdk introduction

etcd open source address: https://github.com/etcd-io/etcd

The version used in this article is 3.5.8

1.5 Deadlock problem

In order to avoid deadlock, etcd provides a lease mechanism. A lease, as the name implies, is a time-sensitive agreement. Once the deadline specified in the lease is reached, the lease will expire. At the same time, etcd also provides a renewal mechanism (keepAlive ), users can delay the expiration time of the lease through the renewal operation.

The idea of using this mechanism to unlock possible deadlock problems in distributed locks is as follows:

  • The user applies for a lease and sets the expiration time of the lease
  • Asynchronously starts a coroutine, which is mainly responsible for undoing the curse within a certain period of time and continuing the contract renewal operation before the business logic is completed.
  • When locking, bind the kv data corresponding to the lock to the lease, so that the lock data and the lease have the same expiration time attribute.

Under this setting, even if the owner of the distributed lock encounters an exception and cannot unlock it, the release of the distributed lock can be completed through the lease mechanism, thus avoiding the deadlock problem.

1.6 How etcd avoids the thundering herd effect

In order to avoid the thundering herd effect, etcd provides a prefix mechanism and a version revision mechanism, as follows:

  • For the same distributed lock, the key of the lock record data has a common public prefix, which is used as the identifier of the lock.
  • When each lock-taking party takes the lock, it will splice its own identity (lease id) with the lock prefix to generate a completed lock key. Therefore, the lock key completed by each lock-taking party is different (only a common public prefix), so in theory, each lock-taking party can insert the lock record data into etcd (Inserting data does not mean Locked successfully)
  • When each lock-taking party inserts lock record data, it will obtain its own lock key, a unique and increasing revision number within the lock prefix range, revision.
  • The successful insertion of the lock data record by the lock taker does not mean that the lock is successful. Instead, it needs to query the record list of the lock prefix after inserting the data. If the revision version corresponding to the lock key is the smallest, it means that the lock is successful.
  • If the lock is occupied by others, the lock taker will listen for the deletion event of the lock key whose revision is smaller than itself and closest to itself.

In this way, all the lock-takers will be arranged in a queue according to the coordination of the revision mechanism and according to the size of the lock-take serial number (revision). Whenever the lock is released, only the next lock-taker will be alerted, shocking the group. The problem is solved.

2 Implementation source code

The file is in the etcd-3.5.8\client\v3\concurrency path

2.1 Data structure

2.2 Session

Session refers to a conversation, which corresponds to a lease. The user calls the NewSession method to construct a session instance. The execution steps are as follows:

  • Apply for a lease id through the client.Grant method
  • Call the client.KeepAlive method to continue renewing the lease
  • Construct a session instance
  • Asynchronously start a daemon coroutine to process the corresponding parameters of lease renewal (keepAlive)

If the user finishes executing the business logic, the session can be closed through the session.Close method. In the Close method, the renewed coroutine will be terminated through the context’s cancel function (the renewed coroutine depends on the session’s context).

package concurrency

import (
"context"
"time"

v3 "go.etcd.io/etcd/client/v3"
)

const defaultSessionTTL = 60

// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
typeSession struct {
client *v3.Client
opts *sessionOptions
id v3.LeaseID

cancel context.CancelFunc
donec <-chan struct{}
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
ops := & amp;sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
for _, opt := range opts {
opt(ops)
}

id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = resp.ID
}

ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
cancel()
return nil, err
}

donec := make(chan struct{})
s := & amp;Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

// keep the lease alive until client error or canceled context
go func() {
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
}()

return s, nil
}

// Client is the etcd client that is attached to the session.
func (s *Session) Client() *v3.Client {
return s.client
}

// Lease is the lease ID for keys bound to the session.
func (s *Session) Lease() v3.LeaseID { return s.id }

// Done returns a channel that closes when the lease is orphaned, expires, or
// is otherwise no longer being refreshed.
func (s *Session) Done() <-chan struct{} { return s.donec }

// Orphan ends the refresh for the session lease. This is useful
// in case the state of the client connection is indeterminate (revoke
// would fail) or when transferring lease ownership.
func (s *Session) Orphan() {
s.cancel()
<-s.donec
}

// Close orphans the session and revokes the session lease.
func (s *Session) Close() error {
s.Orphan()
// if revoke takes longer than the ttl, lease is expired anyway
ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
_, err := s.client.Revoke(ctx, s.id)
cancel()
return err
}

type sessionOptions struct {
ttl int
leaseID v3.LeaseID
ctx context.Context
}

// SessionOption configures Session.
type SessionOption func(*sessionOptions)

// WithTTL configures the session's TTL in seconds.
// If TTL is <= 0, the default 60 seconds TTL will be used.
func WithTTL(ttl int) SessionOption {
return func(so *sessionOptions) {
if ttl > 0 {
so.ttl = ttl
}
}
}

// WithLease specifies the existing leaseID to be used for the session.
// This is useful in process restart scenario, for example, to reclaim
// leadership from an election prior to restart.
func WithLease(leaseID v3.LeaseID) SessionOption {
return func(so *sessionOptions) {
so.leaseID = leaseID
}
}

// WithContext assigns a context to the session instead of defaulting to
// using the client context. This is useful for canceling NewSession and
// Close operations immediately without having to close the client. If the
// context is canceled before Close() completes, the session's lease will be
// abandoned and left to expire instead of being revoked.
func WithContext(ctx context.Context) SessionOption {
return func(so *sessionOptions) {
so.ctx = ctx
}
}

2.3 Mutex

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
    s *Session
    
    pfx string
    myKey string
    myRev int64
    hdr *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
    return & amp;Mutex{s, pfx + "/", "", -1, nil}
}

Mutex is the type of etcd distributed lock. The core fields are as follows:

  • s: Built-in session
  • pfx: the public prefix of distributed locks
  • myKey: pfx + lease id
  • myRev: The version of the current lock user’s lock key (myKey) corresponding to the public prefix pfx
2.3.1 TryLock method

The Mutex.TryLock method will perform a locking attempt. If the locking fails, an error will be returned directly without blocking. The specific process is as follows:

  • Call the Mutex.tryAcquire method (the main purpose is to check whether myKey has been inserted, insert it if it is not, and query if it is) to obtain the revision corresponding to myKey and the current lock holder.
  • If pfx has not been occupied, or the corresponding minimum revision value under pfx is equal to Mutex’s myRev, this indicates that the lock is successful.
  • If the lock is occupied, delete the kv record created by locking yourself (because it is an attempt), and then return an error that the lock has been occupied by others.
// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// Cannot lock, so delete the key
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return ErrLocked
}
2.3.2 Lock method

The Mutex.Lock method adopts a blocking and locking processing mode. If the lock is occupied by someone else, it will continue to block and wait for the opportunity until it successfully obtains the lock:

  • Call the Mutex.tryAcquire method (the main purpose is to check whether myKey has been inserted, insert it if it is not, and query if it is) to obtain the revision corresponding to myKey and the current lock holder.
  • If pfx has not been occupied, or the corresponding minimum revision value under pfx is equal to Mutex’s myRev, this indicates that the lock is successful.
  • If the lock is occupied, it enters blocking mode, calls the waitDeletes method, and watch monitors the deletion event recorded by the lock whose revision is smaller than and closest to itself.
  • When receiving the deletion event of that lock record, it will check whether its own lease has expired. If not, the lock is successful.
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// wait for deletion revisions prior to myKey
// TODO: early termination if the session key is deleted before other session keys with smaller revisions.
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
return werr
}

// make sure the session is not expired, and the owner key still exists.
gresp, werr := client.Get(ctx, m.myKey)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}

if len(gresp.Kvs) == 0 { // is the session key lost?
return ErrSessionExpired
}
m.hdr = gresp.Header

return nil
}
2.3.3 Mutex.tryAcquire()

Mutex.tryAcquire method, the user will complete the insertion of the data and obtain the revision

  • Based on the transaction operation of etcd, it is judged that if the current myKey has not been created, create a kv record and execute the getOwner method to obtain the current lock holder; if it has been created, query the corresponding kv record and execute the getOwner method to obtain the current lock holder. By;
  • Update the current revision value of myKey, then return the revision corresponding to myKey and the revision of the current lock, and provide the upper TryLock method and Lock method for use.
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
2.3.4 waitDeletes method
  • Implementing spin based on for loop
  • Each loop obtains the key of the holder whose revision is smaller than and closest to its own address.
  • If the key does not exist, it means that your revision is already the smallest and the lock is successful.
  • If the key exists, call the waitDelete method to block the deletion event of the key.
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()

var wr v3.WatchResponse
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}
2.3.5 Unlock method

Just delete your own kv pair record when unlocking. If you are the lock holder, then deleting the kv record is the real unlocking; if you are not the lock holder, deleting the kv record means you exit the lock grabbing process, which will not have much impact (don’t worry about this) The operation will cause the next lock-fetching party in the queue to accidentally wake up and cause chaos, because after the waitDeletes method listens to the deletion event, it will re-obtain the kv pair that is smaller than and closest to its own revision. If it exists, it will continue to listen until it is successfully fetched. Lock (own revision is the smallest))

func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return nil
}