5. RedisResponseTimeoutException of Redisson exception caused by value being null

Article directory

  • Project scenario:
  • Problem Description
  • Cause Analysis:
  • solution:
  • Comprehension:

Project scenario:

The project uses redis to store some hot data

Description of the problem

The next morning after the upgrade, a large number of redis-related exceptions occurred in the production monitoring group. The exception stack information is as follows:

org.springframework.dao.QueryTimeoutException: Redis server response timeout (3000 ms) occurred after 0 retry attempts.
Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: (INCRBY),
params: [[-84, -19, 0, 5, 116, 0, 18, 101, 110, 113, ...], 9],
channel: [id: 0xd***e, L:/17*.*.*.*:3***6 - R:r-2zeijkfagqgv2g42ye.redis.rds.aliyuncs.com/17*.*.*. *:6379];
nested exception is org.redisson.client.RedisResponseTimeoutException:
Redis server response timeout (3000 ms) occurred after 0 retry attempts.
Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: (INCRBY),
params: [[-84, -19, 0, 5, 116, 0, 18, 101, 110, 113, ...], 9],
channel: [id: 0xdcb***ce, L:/17*.*.*.*:3***6 - R:r-2zeijkfagqgv2g42ye.redis.rds.aliyuncs.com/10.1*****: 6379]
at org.redisson.spring.data.connection.RedissonExceptionConverter.convert(RedissonExceptionConverter.java:48)
at org.redisson.spring.data.connection.RedissonExceptionConverter.convert(RedissonExceptionConverter.java:35)
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44)
at org.redisson.spring.data.connection.RedissonConnection.transform(RedissonConnection.java:197)
at org.redisson.spring.data.connection.RedissonConnection.syncFuture(RedissonConnection.java:192)
at org.redisson.spring.data.connection.RedissonConnection.sync(RedissonConnection.java:360)
at org.redisson.spring.data.connection.RedissonConnection.write(RedissonConnection.java:726)
at org.redisson.spring.data.connection.RedissonConnection.incrBy(RedissonConnection.java:574)
at org.springframework.data.redis.core.DefaultValueOperations.lambda$increment$1(DefaultValueOperations.java:167)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:97)
at org.springframework.data.redis.core.DefaultValueOperations.increment(DefaultValueOperations.java:167)

Cause Analysis:

First, check the stack and find that there is a bug in the code, resulting in the data stored in redis, and the value is null. At that time, the value was empty only in rare cases, so I planned to fix it at night and began to check why the timeout occurred.
1. Contact the operation and maintenance to check the running status of redis and check the configuration of Redis parameters; (no modification has been made recently, and no problems have been found)
2. Network reasons; (the network is not jittering)
3. Started to check the Redis source code. At this time, some colleagues reported that when the value stored in redis was null, the RedisResponseTimeoutException problem would occur. Combining the source code and the phenomenon, the problem was found. Continue to read the analysis below.
4. From the surface of the log, it is indeed a timeout. The first sentence “org.springframework.dao.QueryTimeoutException: Redis server response timeout (3000 ms) occurred after 0 retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. ” means that there is no response after 3000ms, and 3000ms is the response timeout time of redisson, so where does this come from? In the source code, (several important methods write, writeAsync, async, execute, checkWriteFuture, scheduleResponseTimeout)

 <T> T write(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {<!-- -->
        RFuture<T> f = executorService.writeAsync(key, codec, command, params);
        indexCommand(command);
        return sync(f);
    }
\t
public <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params) {<!-- -->
        NodeSource source = getNodeSource(key);
        return async(false, source, codec, command, params, false, false);
    }

public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
            RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {<!-- -->
        CompletableFuture<R> mainPromise = createPromise();
        RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise,
                                                    ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
        executor. execute();
        return new CompletableFutureWrapper<>(mainPromise);
    }

public void execute() {<!-- -->
        if (mainPromise. isCancelled()) {<!-- -->
            free();
            return;
        }

        if (!connectionManager.getShutdownLatch().acquire()) {<!-- -->
            free();
            mainPromise.completeExceptionally(new RedissonShutdownException("Redisson is shutdown"));
            return;
        }

        codec = getCodec(codec);

        CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();

        CompletableFuture<R> attemptPromise = new CompletableFuture<>();
        mainPromiseListener = (r, e) -> {<!-- -->
            if (mainPromise. isCancelled() & amp; & amp; connectionFuture. cancel(false)) {<!-- -->
                log.debug("Connection obtaining canceled for {}", command);
                timeout. cancel();
                if (attemptPromise. cancel(false)) {<!-- -->
                    free();
                }
            }
        };

        if (attempt == 0) {<!-- -->
            mainPromise. whenComplete((r, e) -> {
                if (this. mainPromiseListener != null) {
                    this. mainPromiseListener. accept(r, e);
                }
            });
        }

        scheduleRetryTimeout(connectionFuture, attemptPromise);

        connectionFuture. whenComplete((connection, e) -> {
            if (connectionFuture.isCancelled()) {<!-- -->
                connectionManager.getShutdownLatch().release();
                return;
            }

            if (connectionFuture.isDone() & amp; & amp; connectionFuture.isCompletedExceptionally()) {<!-- -->
                connectionManager.getShutdownLatch().release();
                exception = convertException(connectionFuture);
                return;
            }

            sendCommand(attemptPromise, connection);

            writeFuture.addListener(new ChannelFutureListener() {<!-- -->
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {<!-- -->
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
            });
        });

        attemptPromise. whenComplete((r, e) -> {
            releaseConnection(attemptPromise, connectionFuture);

            checkAttemptPromise(attemptPromise, connectionFuture);
        });
    }

private void checkWriteFuture(ChannelFuture future, CompletableFuture<R> attemptPromise, RedisConnection connection) {
        if (future.isCancelled() || attemptPromise.isDone()) {<!-- -->
            return;
        }

        if (!future.isSuccess()) {<!-- -->
            exception = new WriteRedisConnectionException(
                    "Unable to write command into connection! Increase connection pool size. Node source: " + source + ", connection: " + connection +
                    ", command: " + LogHelper.toString(command, params)
                     + " after " + attempt + " retry attempts", future. cause());
            if (attempt == attempts) {<!-- -->
                attemptPromise.completeExceptionally(exception);
            }
            return;
        }

        timeout. cancel();

        scheduleResponseTimeout(attemptPromise, connection);
    }

\t

Redisson’s write operation is an asynchronous to synchronous process, using the sync method to block and wait for the result of the future. Since it is an asynchronous process, how does it control the timeout period? The answer is the org.redisson.command.RedisExecutor#scheduleResponseTimeout method. This method will start a scheduled task. The task will be executed after 3000ms (configurable). The content of the task is Set the operation result as timeout, that is, if the task is not canceled within 3000ms, a timeout exception will be thrown. The core logic is this piece of code:

private void scheduleResponseTimeout(CompletableFuture<R> attemptPromise, RedisConnection connection) {<!-- -->
        long timeoutTime = responseTimeout;
        if (command != null & amp; & amp; command.isBlockingCommand()) {<!-- -->
            Long popTimeout = null;
            if (RedisCommands.BLOCKING_COMMANDS.contains(command)) {<!-- -->
                for (int i = 0; i < params. length-1; i ++ ) {<!-- -->
                    if ("BLOCK".equals(params[i])) {<!-- -->
                        popTimeout = Long.valueOf(params[i + 1].toString()) / 1000;
                        break;
                    }
                }
            } else {<!-- -->
                popTimeout = Long.valueOf(params[params.length - 1].toString());
            }

            handleBlockingOperations(attemptPromise, connection, popTimeout);
            if (popTimeout == 0) {<!-- -->
                return;
            }
            timeoutTime += popTimeout * 1000;
            // add 1 second due to issue https://github.com/antirez/redis/issues/874
            timeoutTime += 1000;
        }

        long timeoutAmount = timeoutTime;
        TimerTask timeoutResponseTask = timeout -> {<!-- -->
            if (isResendAllowed(attempt, attempts)) {<!-- -->
                if (!attemptPromise. cancel(false)) {<!-- -->
                    return;
                }

                connectionManager.newTimeout(t -> {<!-- -->
                    attempt + + ;
                    if (log.isDebugEnabled()) {<!-- -->
                        log.debug("attempt {} for command {} and params {}",
                                attempt, command, LogHelper.toString(params));
                    }

                    mainPromiseListener = null;
                    execute();
                }, retryInterval, TimeUnit. MILLISECONDS);
                return;
            }

            attemptPromise.completeExceptionally(
                    new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occurred"
                             + " after " + attempt + " retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: "
                             + LogHelper.toString(command, params) + ", channel: " + connection.getChannel()));
        };

        timeout = connectionManager.newTimeout(timeoutResponseTask, timeoutTime, TimeUnit.MILLISECONDS);
    }

The above code shows that writeFuture has a Listener, which checks checkWriteFuture when the write operation is completed. In other words, before the scheduleResponseTimeout, the actual request has been sent, so has the redis server received it? Use the monitor function of redis-cli to view the requests received by the server, and you can indeed see the written requests. From the simulated redis server, the processing of the SET operation is also very fast. Does the problem arise on the response processing side? Next, let’s take a look at what redisson does when processing requests/responses. For details, please refer to the org.redisson.client.handler.RedisChannelInitializer#initChannel method, the code is as follows:

@Override
protected void initChannel(Channel ch) throws Exception {
    initSsl(config, ch);
    
    if (type == Type.PLAIN) {
        ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
    } else {
        ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
    }

    ch.pipeline().addLast(
        connectionWatchdog,
        CommandEncoder. INSTANCE,
        CommandBatchEncoder. INSTANCE);

    if (type == Type.PLAIN) {
        ch.pipeline().addLast(new CommandsQueue());
    } else {
        ch.pipeline().addLast(new CommandsQueuePubSub());
    }

    if (pingConnectionHandler != null) {
        ch.pipeline().addLast(pingConnectionHandler);
    }
    
    if (type == Type.PLAIN) {
        ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));
    } else {
        ch.pipeline().addLast(new CommandPubSubDecoder(config));
    }

    ch.pipeline().addLast(new ErrorsLoggingHandler());

    config.getNettyHook().afterChannelInitialization(ch);
}

You can see many familiar figures in the code, such as pingConnectionHandler, CommandEncoder, connectionWatchdog. From the above code, we can simply draw the input and output pipelines, and the output (redis request) pipieline is as follows:

ErrorsLoggingHandler -> CommandsQueue -> CommandBatchEncoder -> CommandEncoder

The pipeline of input (response to redis, including connection establishment, etc.) is as follows:

RedisConnectionHandler -> ConnectionWatchdog -> PingConnectionHandler -> CommandDecoder -> ErrorsLoggingHandler

On the output link, according to our analysis, an exception is thrown in the CommandEncoder when set null, which interrupts the request, and the link is normal when set is normal. On the input-to-response link, the most important thing is CommandDecoder, and the others either process logs or process connection events. Here we focus on analyzing CommandDecoder, that is, the decoder. The first step of decoding is very important, which is to take out the operation command corresponding to the request, which is what redisson does:

protected QueueCommand getCommand(ChannelHandlerContext ctx) {
    Queue<QueueCommandHolder> queue = ctx.channel().attr(CommandsQueue.COMMANDS_QUEUE).get();
    QueueCommandHolder holder = queue. peek();
    if (holder != null) {
        return holder. getCommand();
    }
    return null;
}

It takes out a queue from the channel, and then peeks out the QueueCommandHolder from the queue as the current command response to be processed. Since it can be taken from the queue, when is it inserted into the queue? We can see that in the pipeline of the request, there is a CommandsQueue, which is where the command is inserted into the queue.

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    if (msg instanceof QueueCommand) {
        QueueCommand data = (QueueCommand) msg;
        QueueCommandHolder holder = new QueueCommandHolder(data, promise);

        Queue<QueueCommandHolder> queue = ctx.channel().attr(COMMANDS_QUEUE).get();

        while (true) {
            if (lock. compareAndSet(false, true)) {
                try {
                    queue. add(holder);
                    ctx.writeAndFlush(data, holder.getChannelPromise());
                } finally {
                    lock. set(false);
                }
                break;
            }
        }
    } else {
        super.write(ctx, msg, promise);
    }
}

At this point, we have roughly known the corresponding relationship between queue writing and fetching, because peek will not perform remove operations on the data in the queue, so there must be a place to push out the data in the queue, and redisson’s approach is to decode Launched later, the following method will be called after decode:

protected void sendNext(Channel channel) {
    Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
    queue. poll();
    state(null);
}

Analyzing here, our problem has basically been solved. In the case, after the stringRedisTemplate set null, the request was not sent because a null pointer exception was thrown in the CommandEncode stage, so there will definitely be no CommandDecoder stage. , according to the order of the pipeline, CommandsQueue is executed before CommandEncoder, which also means that the set null (referring to instruction 1) request instruction is stuffed into the queue, and there is no decoding stage to push it out of the queue, which results in the normal value of the next set (instruction 2), the request is sent to redis, and redis also responds, but in the decoding stage, the fetched operation instruction is the previous request (instruction 1), so the decoding is abnormal, and, because the fetched at this time is The command of the first operation (command 1), the command of the second time (command 2) is still waiting for the next time to be taken out from the queue and processed (promise.tryFailure), which will cause the normal set (command 2) to time out. Since then, errors will occur in all subsequent requests, because the QueueCommand taken out in the decoding stage is always the last request, until the PinConnectionHandler makes an error and disconnects, and resets the data of the entire channel under the collaborative processing of ConnectionWatchDog. As a mature open source framework, redisson should not have this bug, and the solution is clear:
Pre-increment value judgment

Solution:

Add key and value non-null judgments before data is put into Redis.

Sentiment:

In the process of analyzing why redisson spreads abnormally, the actual process is far more complicated than that described in the article.
One is that the log when redisson makes an error is quite misleading. The first reaction when seeing the error log is either a network error or a redis server error.
After a long time of analysis, the cause of the network and redis was ruled out,
So turn to analyze the implementation mechanism of redisson itself.
In addition, in the process of analyzing redisson, the initial assumption was that the connection was closed during the request phase, but the handle or reference was not released, which also took a long time.
During the analysis process, a lot of TRACE logs were also printed, which also helped to rule out a lot of wrong directions.
All in all, analyzing problems is a time-consuming process, and it is also a process of continuous learning and progress. In this process, I am familiar with redisson and provide a good reference for my own application code in the future.