Redisson distributed lock usage and source code analysis

Redisson distributed lock usage and source code analysis

About introduction

The way to implement distributed locks is mainly through Lua scripts. A characteristic of Lua scripts is that they can execute multiple commands and are atomic.

(If you don’t know Lua scripts, you can go to the official website or github to learn more. Official website: http://www.lua.org/home.html

Github download address: https://github.com/rjpcomputing/luaforwindows/releases), the following will also involve JUC’s Future and Semaphore. If you don’t know, you can learn JUC concurrent programming first.

Why use distributed locks?

If we want to implement locks, the simplest way can be to use synchronized synchronization locks to prevent concurrency security, but this method is only applicable to single mode. Assuming that we are in the form of a cluster, the front end will access us through nginx forwarding and polling. For the back-end interface, synchronized is a built-in lock that is implemented based on the memory shared between threads. Cluster nodes could not share synchronized before. Then we set a key value through redis and then determine whether it exists to implement locking. What? Look at the code below

@Test
    public void locks() {
        String locks = "lock:product1";
        String clientId = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(locks, clientId, 30, TimeUnit.SECONDS);
        try {
            int count = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            //If the inventory is greater than 0, subtract 1 and reassign the value
            if(count>0){
                stringRedisTemplate.opsForValue().set("stock",(count-1) + "");
                System.out.println("Deduction successful, remaining inventory: " + (count-1));
            }else {
                System.out.println("Deduction failed, insufficient inventory");
            }
        } catch (NumberFormatException e) {
            throw new RuntimeException(e);
        }finally {
            //Release the lock
             if (clientId.equals(stringRedisTemplate.opsForValue().get(locks))) {
                stringRedisTemplate.delete(locks);
            }
        }
    }

This code uses redis to prevent concurrency issues

1. Set the timeout to prevent the program from suddenly hanging up, causing the key value to not be deleted and causing a deadlock.

2. Set the value of the key value to UUID and make a judgment when releasing the lock to prevent this scenario: Assume that the time for thread A to execute logic after acquiring the lock exceeds the timeout of the lock. At this time, the lock It has been released. At this time, thread B comes and acquires the lock and continues to run. Then thread A finishes running and is ready to delete the lock. At this time, thread B’s lock is deleted, and other threads will come in again. In case of unsafe concurrency issues, the UUID judgment is added to prevent deletion of locks acquired by other threads.

However, the above code will still have problems when the concurrency is very high. The judgment of releasing the lock is not atomic. For example: Thread A completes the business and enters the judgment of releasing the lock. At this time, the lock has just been released. The timeout has expired. Thread A is stuck for a while. Then thread B acquires the lock and runs down. When thread A wakes up and deletes the lock, the problem of deleting thread B’s lock will arise again.

As can be seen from the above description, in scenarios with high concurrency, the above implementation method will not work. Next, let’s take a look at how Redisson’s distributed lock solves this problem.

Simple Usage of Redisson distributed lock

? Use Redisson.getLock(key value) to obtain the lock, Redisson.lock() to obtain the lock, and Redisson.lock() to release the lock. The following is an example of springboot combined with redis

POM dependencies:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wy</groupId>
    <artifactId>redis-tools</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>redis-lock</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.6.5</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Examples

@Bean
public Redisson redisson() {<!-- -->
  // This is stand-alone mode
  Config config = new Config();
  config.useSingleServer().setAddress("redis://IP:port").setPassword("password").setDatabase(0);
  return (Redisson) Redisson.create(config);
 }

public void locks1() {<!-- -->
    String locks = "lock:product1";
    RLock lock = redisson.getLock(locks);
    lock.lock();
    try {<!-- -->
        int count = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        //If the inventory is greater than 0, subtract 1 and reassign the value
        if(count>0){<!-- -->
            stringRedisTemplate.opsForValue().set("stock",(count-1) + "");
            System.out.println("Deduction successful, remaining inventory: " + (count-1));
        }else {<!-- -->
            System.out.println("Deduction failed, insufficient inventory");
        }
    } catch (NumberFormatException e) {<!-- -->
        throw new RuntimeException(e);
    }finally {<!-- -->
        //Release the lock
        lock.unlock();
    }
}

Source code analysis

Let’s directly click on Redisson’s lock method to see the core content.

A step-by-step analysis of the core methods

RedissonLock.lockInterruptibly(long leaseTime, TimeUnit unit): The beginning of core content

The leaseTime and unit under the parameters are the timeout time and time unit of the key value.

 @Override
    public void lock() {<!-- -->
        try {<!-- -->
            lockInterruptibly();
        } catch (InterruptedException e) {<!-- -->
            Thread.currentThread().interrupt();
        }
    }
@Override
    public void lockInterruptibly() throws InterruptedException {<!-- -->
        lockInterruptibly(-1, null);
    }
    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {<!-- -->
        //Get the current thread ID
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        //Returning empty means no subsequent operations will be performed. The lock has been acquired and the process continues to perform other operations.
        if (ttl == null) {<!-- -->
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {<!-- -->
            while (true) {<!-- -->
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {<!-- -->
                    break;
                }

                // waiting for message
                if (ttl >= 0) {<!-- -->
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {<!-- -->
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {<!-- -->
            unsubscribe(future, threadId);
        }
// get(lockAsync(leaseTime, unit));
    }
RedissonLock.tryAcquire(leaseTime, unit, threadId): This method implements how redis locks and ensures atomicity, and how to extend the key value
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {<!-- -->
    if (leaseTime != -1) {<!-- -->
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {<!-- -->
        @Override
        public void operationComplete(Future<Long> future) throws Exception {<!-- -->
            if (!future.isSuccess()) {<!-- -->
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {<!-- -->
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {<!-- -->
    if (leaseTime != -1) {<!-- -->
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {<!-- -->
        @Override
        public void operationComplete(Future<Long> future) throws Exception {<!-- -->
            if (!future.isSuccess()) {<!-- -->
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {<!-- -->
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}
RedissonLock.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG): The specific implementation of this method is to write a Lua script, set the lock and timeout, and lock reentry

Let’s first look at tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG) in this method

Parameter introduction:

? leaseTime: timeout period

? unit: time unit

? threadId: current thread ID

? RedisCommands.EVAL_LONG: used to execute Lua scripts and return a long integer result

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    //first paragraph
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //Second paragraph
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //Third paragraph
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

Explain the meaning of the above Lua script:

? First paragraph: First determine whether the incoming key value (that is, the getName() method, the key value is passed in by redisson.getLock(key value)) exists. If it does not exist, set the key value to the hashset object, and the key inside It is the thread ID + UUID (implemented by the getLockName(threadId) method), and sets the timeout. If the time is not passed by default, it is 30s and returns null, which means that the thread has obtained the lock and set the value, and can continue to the main thread. business logic

? Second paragraph: Determine whether the key exists 1, if it exists, it will increase by 1, and then set the timeout to return null. This paragraph involves reentrant locks. We will talk about it later.

? Third paragraph: If the lock already exists, return the lock survival time

Look at the subsequent operations of RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG) in the tryAcquireAsync method. There is an addListener method, which is waiting. After the above code is executed, it will be executed. If you don’t understand, you can learn about Future, which is equivalent to monitoring. After tryLockInnerAsync is executed, the specific implementation in addListener will be executed. Mainly look at the scheduleExpirationRenewal(threadId) method, which is mainly for key value timed loop duration,

RedissonLock.scheduleExpirationRenewal(threadId): Lock renewal
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

Let me roughly explain the meaning of the above code. This section is set up very delicately. It is not implemented using the time scheduled task that comes with jdk. As for why, the bosses can explain it. Let’s see.

internalLockLeaseTime (default value of timeout, can be set when lock() is set to acquire the lock)

The commandExecutor.getConnectionManager().newTimeout method is similar to the delayed execution of the thread pool. The delayed time is the time set by the key value/3. By default, it will be executed after 10 seconds. Let’s look at the Lua script and determine the key first. Whether it exists, if it exists, set the key value time to 30s, and then execute the scheduleExpirationRenewal method after 10s to achieve the timing loop continuation. If the lock has been deleted, then the task of setting the lock through expirationRenewalMap is deleted. The task is scheduled to stop.

Back to setting the key value and setting the timeout of the key value. This is equivalent to the thread grabbing the lock. Let’s talk about what to do with the thread that did not grab it? Let’s go back to lockInterruptibly(long leaseTime, TimeUnit unit )method,

RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);

The above code is the redis subscription thread ID, and implements monitoring. Where the message is monitored and the operations performed will be discussed later. Let’s start with this while loop.

try {<!-- -->
    while (true) {<!-- -->
        ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {<!-- -->
            break;
        }

        // waiting for message
        if (ttl >= 0) {<!-- -->
            getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } else {<!-- -->
            getEntry(threadId).getLatch().acquire();
        }
    }
} finally {<!-- -->
    unsubscribe(future, threadId);
}

There is a call to tryAcquire(leaseTime, unit, threadId) in the while loop. As we said before, this method is used to set the key. Now suppose that thread A has grabbed the lock and is executing it. Then other threads will come in to execute it. This method will return the key’s survival time. The if (ttl == null) here is equivalent to making a second judgment (judging whether the lock exists. If it does not exist, it means that thread A has finished executing and released the lock). Continue Going down, when we look at ttl times, it must be valuable. Now we are looking at the lock survival time when ttl returns when other threads have not acquired the lock. That is the logic of ttl>0, getEntry( threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS) This line of code means that other threads will be in a waiting state. The waiting time is the survival time of the lock. This is non-blocking and will not occupy the thread. It will not occupy the CPU. If thread A has not finished executing and has not released the lock at this time, it will try again in a loop. This is the principle of other threads intermittently trying to acquire the lock spin. There is a problem at this time. If Assuming that other threads need to wait for the key value timeout, I will treat it as 30s, but thread A has completed execution at 10s. Do other threads have to wait for this 30s before grabbing the lock? The answer is definitely no Yes, I mentioned before that it is the redis subscription thread ID. Let’s first look at where redis publishes messages.

RedissonLock.unlock():RedissonLock.unlock() method to release the lock
@Override
    public void unlock() {<!-- -->
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {<!-- -->
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                     + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {<!-- -->
            cancelExpirationRenewal();
        }

// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {<!-- -->
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {<!-- -->
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
    }
RedissonLock.unlockInnerAsync(Thread.currentThread().getId()): Release the lock through lua script

First look at this unlockInnerAsync(Thread.currentThread().getId()) method

protected RFuture<Boolean> unlockInnerAsync(long threadId) {<!-- -->
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

Briefly describe the meaning of this Lua script:

First paragraph: If the key does not exist, publish the message content of the thread ID, which is LockPubSub.unlockMessage. The message passed is 0 and returns 1, which is true.

Second paragraph: Determine if the thread ID field of the key value does not exist, return empty and do not perform any operation.

The third paragraph: Get the value of the thread ID of the key -1. If it is greater than 0, reset the expiration time. Otherwise, delete the key and publish the message to return 1, which is true.

Now that we know where the message is published, we can go back to the subscription and subscribe(threadId) to find the specific logic behind listening to the message.

LockPubSub.onMessage(RedissonLockEntry value, Long message): Specific operations after listening to the message
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {<!-- -->

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {<!-- -->
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {<!-- -->
        if (message.equals(unlockMessage)) {<!-- -->
            value.getLatch().release();

            while (true) {<!-- -->
                Runnable runnableToExecute = null;
                synchronized (value) {<!-- -->
                    Runnable runnable = value.getListeners().poll();
                    if (runnable != null) {<!-- -->
                        if (value.getLatch().tryAcquire()) {<!-- -->
                            runnableToExecute = runnable;
                        } else {<!-- -->
                            value.addListener(runnable);
                        }
                    }
                }
                
                if (runnableToExecute != null) {<!-- -->
                    runnableToExecute.run();
                } else {<!-- -->
                    return;
                }
            }
        }
    }

}

The onMessage() method of the above code is the action taken after listening to the published message. Earlier I talked about the lua script in the Redisson.unlockInnerAsync() method. It said that when the key value does not exist or the key is to be deleted, the message will be published. The message If the content is 0, it corresponds. At this time, a license will be released. When other waiting threads obtain the license, the thread is released to fight for the lock.

Summary

The above is my analysis of the source code after watching the learning video of Teacher Zhuge from Shang Silicon Valley. There may be some errors in the description, but it is not bad at all. If there are any questions or mistakes, please discuss them. Please do not complain.