mq consumption concurrent queuing and idempotent mechanism implementation-redisson

1. What problems to solve and how to implement them

Mainly to solve the problem of concurrent access queuing execution and repeated delivery to ensure idempotence.

Implement technical solutions

How to implement concurrent access and request queuing execution methods. @Redssion

How to ensure idempotence when MQ repeats delivery. @redis

First insert a topic of distributed locks, as follows. Before using it, think about whether there are shared resources and what to protect? Think about mq consumption again, whether there are such problems, and what needs to be done to solve these problems. Instead of using a technology for its own sake, you should start from the problem itself and then find a technical solution.

When reading and writing a [shared resource] concurrently, in order to ensure the accuracy of the data, we need to control access by only one thread at the same time.

Distributed locks are used to control that only one thread in one JVM process can access protected resources at the same time

Second, the approximate logic of redisson implementation is as follows:

RLock lock = redissonClient.getLock(lockKey);

if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {
Business logic
}

} finally {
    lock.unlock();
}

Ask the following questions

1. Whether the locks obtained by different lockKeys are the same

2. The same lockKey is used to obtain the lock multiple times. Is it the same?

3. What is locking granularity?

Can concurrent access be achieved for the same key and different keys? How to ensure thread queue execution?

4. When does locking fail and how to solve the problem?

5. Define what the meaning of different keys is and why they are not the same key

6. Will multi-thread queue execution become serial execution, will it affect the execution efficiency, and how to solve the efficiency problem?

7. If multi-threaded troubleshooting, where is the thread pool, how many threads can be checked, and how many can be in the queue

8. Comparison with the concept of distributed locks

guess

1. different

2. Same

3. The same key can be queued for multi-thread execution. Different keys are not known. Please refer to

Getting started with Redisson distributed lock (reentrant lock (lock))_redissonclient.getlock_Upstream LV@Caige’s Blog-CSDN Blog

4. The locking failed, but the locking was not successful for the time being. In the queue, waiting for the previous lock to be released, the locking was successful.

5. The same key, concurrent access by multiple threads, will be queued for execution. Will different keys be accessed concurrently?

What’s the point of locking? Isn’t it just a distributed lock? The granularity of control, if it is an MQ key, without repeated MQ issuance, is actually equivalent to no control, that is, concurrent access is allowed.

Can it be understood that different keys mean different locks? Since they are different locks, they can successfully lock each other, so there is no need to queue. Only when the keys are the same and the same lock is used, queuing is required. Then mq delivery is basically concurrent access.

6. Refer to 5. If it is executed through travel, it will definitely affect the efficiency and needs to be confirmed.

7. Don’t know

Analysis of the principle of distributed lock implemented by Redisson – Zhihu

This article talks about the source code of locking, and there is a misunderstanding of the thread pool compared to the previous queuing.

Let’s take a look at this business method first

 if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {

This method is the method of interface Lock

accomplish

Now recall, what is the meaning of the time parameter in tryLock, corresponding to the following waitTime, which is the waiting time of the thread,

The leaseTime parameter defaults to -1 above. This time is the default expiration time of the watchdog.

Pay attention to distinguishing between a waiting time and a key expiration time.

Note: From the above source code, we see that leaseTime must be -1 to enable the Watch Dog mechanism. That is, if you want to enable the Watch Dog mechanism, you must use the default locking time of 30s. If you customize the time yourself, the lock will be automatically released after this time and will not be extended.

How to manually set the expiration time?

by this way

lock.lock(1,TimeUnit.SECONDS);
That is to say, the two methods do not set the expiration time, and the default leaseTime = -1, which implements the gate-keeping mechanism.
lock.lock();
if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {<!-- -->

How to wait to acquire the lock?

After the first thread holding the lock, other threads coming in will subscribe to the lock release message. When the lock holding thread releases the lock, a message will be published. Other lock holders will subscribe to the message and try to obtain the lock. Generally speaking, the lock is obtained through a while infinite loop. Just imagine that you will not keep trying to obtain the lock in an unconditional loop, which is too inefficient. Please refer to above and below for details.

Redis Advanced – Redisson distributed lock implementation principle and source code analysis – Tencent Cloud Developer Community – Tencent Cloud

7.1 Doesn’t multi-threading still have a thread pool?

8. Can it be understood as a distributed lock? If the distributed lock is accessed concurrently (with the same key), the second request will not execute the method. There is a difference whether it will be queued or executed.

Equivalently, what is solved here is the problem of duplication during concurrency, queuing access, so it is also necessary to check whether there is repeated access through redis.

If it is not a repeated delivery or a concurrent scenario, and there is a time interval, redis still needs to verify whether it is repeated.

Then redission is the problem of controlling the queuing of repeated messages, and it is redis that really controls repeated delivery. Can we directly

Remove redisson directly and only use redis to control repeated consumption.

Because the keys are different, redission cannot control concurrent consumption.

And when the keys are the same, they are queued and the purpose of distributed lock cannot be achieved.

What is the concept of distributed lock:

1. Mutual exclusion

Under the conditions of distributed high concurrency, we most need to ensure that only one thread can obtain the lock at the same time. This is the most basic point.

Think about it again, only one thread can obtain the lock at the same time. Is there a prerequisite for this? Does the key have to be the same, or does it have nothing to do with the key?

@There is a premise here, the keys are the same

Reference documentation

[Selected] Redis: Use of Redisson distributed lock (recommended use)_Usage of redisson distributed lock_Chuancheng Dabi’s Blog-CSDN Blog

This article explains the detailed process of locking. It can be concluded that locking can be successful if the key is different. If the key is different when mq is issued, the locking can be successful every time, that is, it is processed concurrently and the granularity is small. Meets the requirements of high efficiency and high concurrency.

So what is its significance? @The analysis of point 8 above seems to be able to be removed.

@Then the task of controlling repeated consumption completely falls on redis. The expiration time is very important. Set a reasonable expiration time to control the problem of repeated consumption during this period. If it will be repeated after this time, either it won’t or the reason will be given.

continue to ask questions

9. In other words, mq cannot reflect the meaning of distributed locks and the scenarios are not suitable. Which scenarios are suitable? Please give examples.

10. Will there be any problems with high concurrent access of mq, such as update failure and shared resource exceptions?

Three, solve the redssion implementation mechanism, refer to the following

Getting started with Redisson distributed lock (reentrant lock (lock))_redissonclient.getlock_Upstream LV@Caige’s Blog-CSDN Blog

Fourth, regarding the principle of surround notification and how to control the execution of methods, please refer to the following

@Around notification execution principle in Spring AOP_aop around control method execution-CSDN Blog

Five, implementation code

Combining the above knowledge, when analyzing the code

package com.yonghui.yh.rme.srm.ordercenter.service.annotation;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yonghui.redis.utils.RedisUtils;
import com.yonghui.yh.rme.srm.ordercenter.common.enums.RedisEnum;

import lombok.extern.slf4j.Slf4j;

/**
 * What problem to solve
 * 1. Control concurrent access and queue when implementing multi-threaded concurrent access.
 * 2, idempotent, prevents repeated consumption of MQ, does not call through reflection (joinPoint.proceed()), and does not execute the original method
 */
@Aspect
@Component
@Slf4j
public class MqConsumerAspect {

    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisUtils redisUtils;


    @Pointcut("@annotation(com.yonghui.yh.rme.srm.ordercenter.service.annotation.MqConsumer)")
    public void point(){}

    /**
     *The role of @Around
     * Enhancement actions can be woven in before the target method, or after the target method is executed;
     *
     * You can decide when and how the target method is executed, and you can even completely prevent the execution of the target method;
     *
     * You can change the parameter values of the target method, and you can also change the return value after executing the target method; when you need to change the return value of the target method, you can only use the Around method;
     *
     * Although Around is powerful, it usually needs to be used in a thread-safe environment. Therefore, if the normal Before and AfterReturing enhancement methods can be used to solve the problem, there is no need to use Around enhancement processing.
     *
     * Annotation method: If you need to enhance a certain method, you only need to add custom annotations to the corresponding method.
     * @param joinPoint
     * @return
     * @throws Throwable
     */
    @Around("point()")
    public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
        Object o = joinPoint.getArgs()[0];
        JSONObject jsonObject;
        if (o instanceof String) {
            jsonObject = JSON.parseObject(o.toString());
        } else {
            jsonObject = JSON.parseObject(JSON.toJSONString(o));
        }
        String messageKey = jsonObject.getString("messageKey");
        if (StringUtils.isBlank(messageKey)) {
            //Will the method continue to execute?
            // @ Answer: Will execute //Use reflection to call the target method, which is method.invoke() proceed = joinPoint.proceed(args);
            log.info("MqConsumerAspect mq message consumption messageKey is empty");
            return joinPoint.proceed();
        }
        // Although Around is powerful, it usually needs to be used in a thread-safe environment. This is the reason for using Lock to ensure safe, multi-threaded queue access.
        String lockKey = RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.toKey(messageKey);
        RLock lock = redissonClient.getLock(lockKey);
        try {
            if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {

                //Concurrent requests, waiting in line, can I control it by writing here? @should be able to decide whether the method is executed and call the joinPoint.proceed() method
                // @ So there will be a queue
                // @ Therefore, repeated consumption returns true results, which is equivalent to the original method returning true directly.
                String key = RedisEnum.ORDER_TRACE_MQ_REPEAT_CONSUMER_KEY.toKey(messageKey);
                // Idempotence, what is it used for? Can the method be controlled to be idempotent and no longer executed? Or just record duplicate records.
                if (redisUtils.exists(key)) {
                    log.info("MqConsumerAspect messageKey={} repeated consumption",key);
                    return true;
                }

                // Is this just a consumption completion log?
                Object proceed = joinPoint.proceed();
                if (proceed.equals(Boolean.TRUE)) {
                    log.info("MqConsumerAspect messageKey={} consumption completed",key);
                    redisUtils.set(key, "1", RedisEnum.ORDER_TRACE_MQ_REPEAT_CONSUMER_KEY.expired);
                }
                return proceed;
            }
            // Queue here and return false directly. Why? Todo, the original method is not executed, and false is returned directly here. Does it mean that the original method returns false?
            return false;
            //The above returns true/proceed/false, what is the impact?
        } catch (Exception e) {
            log.error("MqConsumerAspect error param={}", JSON.toJSONString(o), e);
            throw e;
        } finally {
            lock.unlock();
        }
    }

}

Six, reference documents

The evolution of the correct implementation principle of Redis distributed lock and the practical summary of Redission – Zhihu

This article explains the red lock redLock, the differences between different methods, whether there is a watchdog mechanism, and illustrates the application scenarios of distributed locks through examples.

Analysis of the principle of distributed lock implemented by Redisson – Zhihu

This article explains how to publish and subscribe through source code

How to control access to shared resources during concurrent access, and how to control thread blocking and waiting during spin.

You can refer to the subsequent redission source code analysis.

The thread tries to lock. After failure, redis subscribes to the lock release message and starts to enter the do while spin. During the spin, the thread is blocked and waited through the semaphore. After redission releases the lock (increase the number of semaphores by one and release the license), publish message, other waiting threads are notified and start retrying to acquire the lock, and the semaphore attempts to acquire the license.

JAVA multi-threading–Semaphore_java semaphore-CSDN blog

Multithreads control concurrent access through semaphores, as follows

Java multi-thread concurrency control tool Semaphore, implementation principles and cases-Tencent Cloud Developer Community-Tencent Cloud

Seven, thinking

1. During spinning, the code is as follows. Multi-threads obtain the license through the semaphore. If the license is not obtained, the thread is blocked and will not continue execution.

2. Implementation details of publish and subscribe

We already know that after a thread fails to lock, it first subscribes to the message (other lock-holding threads release the lock), and then starts to enter the loop. When publishing the message, the lock-holding thread releases the lock, in finally.

Subscribing to the message allows the thread waiting for acquisition in the spin to start trying to obtain the license, that is, continue to execute the code, and continue to try to lock in the loop.

The question is, how are subscriptions related to triggered blocking?

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(leaseTime, unit, threadId);
        if (ttl == null) {
            return true;
        } else {
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) {
                this.acquireFailed(threadId);
                return false;
            } else {
                current = System.currentTimeMillis();
                RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.onComplete((res, e) -> {
                            if (e == null) {
                                this.unsubscribe(subscribeFuture, threadId);
                            }

                        });
                    }

                    this.acquireFailed(threadId);
                    return false;
                } else {
                    try {
                        time -= System.currentTimeMillis() - current;
                        if (time <= 0L) {
                            this.acquireFailed(threadId);
                            boolean var20 = false;
                            return var20;
                        } else {
                            boolean var16;
                            do {
                                long currentTime = System.currentTimeMillis();
                                ttl = this.tryAcquire(leaseTime, unit, threadId);
                                if (ttl == null) {
                                    var16 = true;
                                    return var16;
                                }

                                time -= System.currentTimeMillis() - currentTime;
                                if (time <= 0L) {
                                    this.acquireFailed(threadId);
                                    var16 = false;
                                    return var16;
                                }

                                currentTime = System.currentTimeMillis();
                                if (ttl >= 0L & amp; & amp; ttl < time) {
                                    this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                    this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }

                                time -= System.currentTimeMillis() - currentTime;
                            } while(time > 0L);

                            this.acquireFailed(threadId);
                            var16 = false;
                            return var16;
                        }
                    } finally {
                        this.unsubscribe(subscribeFuture, threadId);
                    }
                }
            }
        }
    }