Redis uses Pipeline (pipeline) batch processing

Redis batch processing

During development, it is sometimes necessary to process large batches of Redis.

For example, Redis queries multiple Hash in batches. If you query one by one in a for loop, the performance will be very poor.

In this case, you can use Pipeline.

Pipeline

Pipeline can send multiple commands at once and return the results at once after execution. Pipeline reduces the round-trip delay time by reducing the number of communications between the client and redis, and the principle of Pipeline implementation is queue, and queue The principle is first in, first out, thus ensuring the order of data.

RedisTemplate pipeline operation

For pipeline operations of RedisTemplate, use the **executePipelined()** method.
org.springframework.data.redis.core.RedisTemplate#executePipelined(org.springframework.data.redis.core.SessionCallback)

 @Override
public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {

Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(session, "Callback object must not be null");

RedisConnectionFactory factory = getRequiredConnectionFactory();
//Bind connection
RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
try {
return execute((RedisCallback<List<Object>>) connection -> {
//Open the pipe
connection.openPipeline();
boolean pipelinedClosed = false;
try {
Object result = executeSession(session);
//The return value of callback can only return null, otherwise an error will be reported.
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the pipeline");
}
//Close the pipe and get the return value
List<Object> closePipeline = connection.closePipeline();
pipelinedClosed = true;
return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
if (!pipelinedClosed) {
connection.closePipeline();
}
}
});
} finally {
RedisConnectionUtils.unbindConnection(factory);
}
}

  • Notice:
    (1) For the callback parameter of executePipelined(), the execute() method can only return null, otherwise an error will be reported.
    Error: InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
    The source code is as follows:
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the pipeline");
}

(2) In executePipelined(), the value obtained using the get() method will be null.
Perform pipeline operations in redisTemplate, and the result value can only be obtained through the return value List of the executePipelined() method.

 /**
* Get the value of {@code key}.
*
* @param key must not be {@literal null}.
* When using the get() method in a pipeline, the value obtained will be null.
* @return {@literal null} when used in pipeline / transaction.
*/
@Nullable
V get(Object key);
  • redisTemplate gets the pipeline return value:
List<Object> list = stringRedisTemplate.executePipelined(new SessionCallback<String>() {
            @Override
            public String execute(@NonNull RedisOperations operations) throws DataAccessException {
                //idList is a collection of multiple ids
                for (String id : idList) {
                    //key consists of prefix plus unique id
                    String key = KEY_PREFIX + id;
                    //Use the get() method in the pipeline and the value obtained will be null. Value is obtained through the return value List<Object> of executePipelined().
                    operations.opsForHash().get(key, field);
                }
                //Callback can only return null, otherwise an error will be reported:
                // InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
                return null;
            }
        });

list.forEach(System.out::println);

Jedis operation pipeline

RedisTemplate is more convenient to operate pipelines, but it will be more troublesome if you want to assemble a map of key and value.
In this case, Jedis can be used.
For example, Jedis can use Pipeline to query multiple Hash in batches.
See the source code: redis.clients.jedis.PipelineBase#hget(java.lang.String, java.lang.String)
The hget() method is somewhat similar to the hget() of ordinary hash, but the return value is Response.

 public Response<String> hget(String key, String field) {
        this.getClient(key).hget(key, field);
        return this.getResponse(BuilderFactory.STRING);
    }

Example:

public void testPipLine() {
        Map<String, Response<String>> responseMap = new HashMap<>();
        //try-with-resources, automatically close resources
        //Connect to jedis first, then get the pipeline
        try (Jedis jedis = getJedis();
            Pipeline pipeline = jedis.pipelined()) {
            for (String id : idList) {
                //Prefix plus unique id
                String key = KEY_PREFIX + id;
                //Use pipeline.hget to query hash data
                Response<String> response = pipeline.hget(key, field);
                responseMap.put(id, response);
            }
            pipeline.sync();
        } catch (Exception ex) {
            log.error("responses error.", ex);
        }

        Map<String, String> map = new HashMap<>();
        //Assemble map. response.get() can only be executed after the pipeline is closed, otherwise the value obtained will be null.
        responseMap.forEach((k,response) -> map.put(k, response.get()));

        map.forEach((k,v)-> System.out.println(k + ",val:" + v));

}


  private static Pool<Jedis> jedisPool = null;

  /**
   * Connect to redis and get jedisPool
   * @return
   */
    public Jedis getJedis() {
        if (jedisPool == null) {
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            poolConfig.setMaxTotal(maxTotal);
            poolConfig.setMaxIdle(maxIdle);
            poolConfig.setMaxWaitMillis(maxWaitMillis);
            poolConfig.setTestOnBorrow(testOnBorrow);
            //Configuration can be written in the configuration center/file
            jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database);
        }
        return jedisPool.getResource();
    }

Reference materials

https://redis.io/docs/manual/pipelining/
https://www.cnblogs.com/expiator/p/11127719.html