Redis batch processing
During development, it is sometimes necessary to process large batches of Redis.
For example, Redis queries multiple Hash in batches. If you query one by one in a for loop, the performance will be very poor.
In this case, you can use Pipeline.
Pipeline
Pipeline can send multiple commands at once and return the results at once after execution. Pipeline reduces the round-trip delay time by reducing the number of communications between the client and redis, and the principle of Pipeline implementation is queue, and queue The principle is first in, first out, thus ensuring the order of data.
RedisTemplate pipeline operation
For pipeline operations of RedisTemplate, use the **executePipelined()** method.
org.springframework.data.redis.core.RedisTemplate#executePipelined(org.springframework.data.redis.core.SessionCallback>)
@Override public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(session, "Callback object must not be null"); RedisConnectionFactory factory = getRequiredConnectionFactory(); //Bind connection RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); try { return execute((RedisCallback<List<Object>>) connection -> { //Open the pipe connection.openPipeline(); boolean pipelinedClosed = false; try { Object result = executeSession(session); //The return value of callback can only return null, otherwise an error will be reported. if (result != null) { throw new InvalidDataAccessApiUsageException( "Callback cannot return a non-null value as it gets overwritten by the pipeline"); } //Close the pipe and get the return value List<Object> closePipeline = connection.closePipeline(); pipelinedClosed = true; return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer); } finally { if (!pipelinedClosed) { connection.closePipeline(); } } }); } finally { RedisConnectionUtils.unbindConnection(factory); } }
- Notice:
(1) For the callback parameter of executePipelined(), the execute() method can only return null, otherwise an error will be reported.
Error: InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
The source code is as follows:
Object result = executeSession(session); if (result != null) { throw new InvalidDataAccessApiUsageException( "Callback cannot return a non-null value as it gets overwritten by the pipeline"); }
(2) In executePipelined(), the value obtained using the get() method will be null.
Perform pipeline operations in redisTemplate, and the result value can only be obtained through the return value List of the executePipelined() method.
/** * Get the value of {@code key}. * * @param key must not be {@literal null}. * When using the get() method in a pipeline, the value obtained will be null. * @return {@literal null} when used in pipeline / transaction. */ @Nullable V get(Object key);
- redisTemplate gets the pipeline return value:
List<Object> list = stringRedisTemplate.executePipelined(new SessionCallback<String>() { @Override public String execute(@NonNull RedisOperations operations) throws DataAccessException { //idList is a collection of multiple ids for (String id : idList) { //key consists of prefix plus unique id String key = KEY_PREFIX + id; //Use the get() method in the pipeline and the value obtained will be null. Value is obtained through the return value List<Object> of executePipelined(). operations.opsForHash().get(key, field); } //Callback can only return null, otherwise an error will be reported: // InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline return null; } }); list.forEach(System.out::println);
Jedis operation pipeline
RedisTemplate is more convenient to operate pipelines, but it will be more troublesome if you want to assemble a map of key and value.
In this case, Jedis can be used.
For example, Jedis can use Pipeline to query multiple Hash in batches.
See the source code: redis.clients.jedis.PipelineBase#hget(java.lang.String, java.lang.String)
The hget() method is somewhat similar to the hget() of ordinary hash, but the return value is Response.
public Response<String> hget(String key, String field) { this.getClient(key).hget(key, field); return this.getResponse(BuilderFactory.STRING); }
Example:
public void testPipLine() { Map<String, Response<String>> responseMap = new HashMap<>(); //try-with-resources, automatically close resources //Connect to jedis first, then get the pipeline try (Jedis jedis = getJedis(); Pipeline pipeline = jedis.pipelined()) { for (String id : idList) { //Prefix plus unique id String key = KEY_PREFIX + id; //Use pipeline.hget to query hash data Response<String> response = pipeline.hget(key, field); responseMap.put(id, response); } pipeline.sync(); } catch (Exception ex) { log.error("responses error.", ex); } Map<String, String> map = new HashMap<>(); //Assemble map. response.get() can only be executed after the pipeline is closed, otherwise the value obtained will be null. responseMap.forEach((k,response) -> map.put(k, response.get())); map.forEach((k,v)-> System.out.println(k + ",val:" + v)); } private static Pool<Jedis> jedisPool = null; /** * Connect to redis and get jedisPool * @return */ public Jedis getJedis() { if (jedisPool == null) { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(maxTotal); poolConfig.setMaxIdle(maxIdle); poolConfig.setMaxWaitMillis(maxWaitMillis); poolConfig.setTestOnBorrow(testOnBorrow); //Configuration can be written in the configuration center/file jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database); } return jedisPool.getResource(); }
Reference materials
https://redis.io/docs/manual/pipelining/
https://www.cnblogs.com/expiator/p/11127719.html