To peel back the cocoon, Redis uses event bus EventBus or AOP to optimize health detection

Table of Contents

Preface

Lettuce

What is event bus EventBus?

Connected

Connection activated

Disconnected

Connection deactivated

Reconnect failed

use

An alternative approach-AOP

Implementation


Preface

In the previous article, SpringBoot integrates Quartz to implement scheduled tasks and Redis health detection (2)_The past is like a smoke separated by many years of blog-CSDN Blog

In the article, the health status of Redis is detected by using Quartz in SpringBoot. Although the connection problem of Redis is solved to a certain extent, there are still some problems. For example, when Redis disconnects within the interval of scheduled detection, this Sometimes the service is still unavailable when a user requests access. So is there a way to notify the SpringBoot program when Redis is disconnected, and then switch from Redis to MySQL? This article will use Redis’s event bus mechanism to solve this problem based on the previous one.

Lettuce

The default Redis client used in SpringBoot 2.x version is Lettuce. Different from the early Jedis, the underlying layer of Lettuce is built by Netty. The asynchronous IO driver improves efficiency, supports high concurrency, and is thread-safe. As can be seen from the figure below, the client used after the Redis dependency is introduced in the project is Lettuce.

?

What is the event bus EventBus?

Although Lettuce has been introduced, how should the problems faced be solved? Here we have to talk about Redis’ EventBus. You can learn relevant information through Lettuce’s official documentation.

Connection Events · lettuce-io/lettuce-core Wiki · GitHub

Since the client uses lettuce6.1.10, we focus on the following Connection Events · lettuce-io/lettuce-core Wiki · GitHub

?

?

It is not difficult to see that lettuce provides an example code for using connection events. By subscribing to the event bus, you can monitor the current Redis connection event status. The connection event status is divided into the following five types:

Connected

The transport protocol connection is established, that is, the TCP three-way handshake is completed, Socket communication is established, and the event type isConnectedEvent.

Connection activated

The logical connection is established, is currently active, and can start distributing Redis commands. At this time, the ping command has been sent and a response has been received, and Redis commands can be used normally. Event typeConnectionActivatedEvent.

Disconnected

The connection was closed, the transport protocol was closed or reset. This event occurs when a periodic shutdown occurs and the connection is interrupted, event typeDisconnected.

Connection deactivated

The connection is deactivated, the logical connection is deactivated, the internal processing state is reset, and the isOpen() flag is set to false. This event occurs when the periodic close and connection are interrupted, event typeConnection deactivated.

Reconnect failed

Reconnection failed (since version 5.3), the attempt to reconnect failed. Contains reconnection failure and reconnection counters. Event TypeRecconectFailedEvent.

Among the above five types, what needs to be focused on is Connection activated. When this connection event occurs, just set the Redis connection flag to true.

use

According to the code given in the official document mentioned above, after using it, you will find that the connection status of Redis is not monitored.

RedisClient client = RedisClient.create()
EventBus eventBus = client.getresources().eventBus();

eventBus.get().subscribe(e -> System.out.println(event));

...
client.shutdown();

At this time, enter the create() method to check the situation. You can see that the default parameterless constructor creates a RedisClient object. The first parameter is the clientResources object, and the second parameter is a string, which is empty. URI, you must have guessed here why the connection cannot be monitored.

Because the corresponding Redis resource or URI path is not passed in, normal event monitoring cannot be performed.

?

It is not difficult to find other constructors with the same name and parameters in the source code. The third method containing a ClientResource object is used here because the object can be injected through Spring, thus eliminating the tedious steps of manual construction.

?

The specific code is as follows

By creating an event bus object and then subscribing to Redis connection events, the connection status identifier of RedisCheckConfig will be modified in real time when Redis is in a non-connected state. The normal operation of the service can be guaranteed even during the intervals between Redis scheduled detection tasks.

@Component
@Slf4j
public class RedisStatusListener {

    @Autowired
    private ClientResources clientResources;

    public RedisClient redisClient() {

        RedisClient client = RedisClient.create(clientResources);
        EventBus eventBus = client.getResources().eventBus();

        eventBus.get().subscribe(e -> {

            RedisCheckConfig.redisConnected = e instanceof ConnectionActivatedEvent ? true : false;
            log.info("EventBus gets whether Redis is connected " + RedisCheckConfig.redisConnected);
        });
        return client;
    }
}

An alternative method-AOP

If you don’t want to use the event bus to solve it, here is another idea. Judgment can be accomplished through AOP.

By detecting all operation instructions in the Redis operation class, that is, when the Redis operation command is executed for more than 1s (adjustable), it is considered that a connection exception occurs in Redis.

Here we need to use the thread pool because we need to count the execution time, and implement business logic interception through AOP’s surround notification. That is, when the Redis command operation exceeds the specified time, the data will be returned to the default type of the corresponding stored data.

Concrete implementation

@Component
@Aspect
@Slf4j
public class RedisOperationAspect {

    @Around("execution(* com.o2o.shop.util.RedisOperator.*(..))")
    public Object monitorRedisCommandExecution(ProceedingJoinPoint joinPoint) throws Throwable {
        // Get method information
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method method = methodSignature.getMethod();
        String methodName = method.getName();

        //Execution timeout time, unit ms
        long timeout = 1000;

        ExecutorService executor = new ThreadPoolExecutor(1, 1,
                1000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));

        //Create Future object
        Future<Object> future = executor.submit(() -> {
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                log.error("Redis AOP execution " + joinPoint + "Exception");
                throw new RuntimeException(throwable);
            }
        });

        //Execute the method and wait for the result, set the timeout
        try {
            return future.get(timeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            // timeout processing
            log.warn("Redis command execution timed out");
            //Interrupt method execution
            future.cancel(true);
            //Return specific results
            Class<?> returnType = method.getReturnType();

            if (returnType.isPrimitive()) {
                if (returnType == boolean.class) {
                    return false;
                } else if (returnType == char.class) {
                    return '\0';
                } else if (returnType == byte.class || returnType == short.class ||
                        returnType == int.class || returnType == long.class) {
                    return 0;
                } else if (returnType == float.class || returnType == double.class) {
                    return 0.0f;
                }
            }
            return null;
        } finally {
            // Close the thread pool
            executor.shutdown();
        }
    }

}