Application of Redis zset on asynchronous queue

When encountering concurrent client requests, in order to relieve the processing pressure on the server, when the request does not require high real-time processing of the response, an asynchronous request message queue can be implemented.

One implementation strategy is to use redis’s zset, use the expiration processing time of the message as the score, and then use multiple threads to rotate to obtain the tasks in the zset and process them.

There is a question that needs to be considered in advance:

How to prevent a task from being processed multiple times?

One solution is to call redis’s zrem command to remove the task from the specified zset when multiple threads obtain the task (taking advantage of the fact that redis processes commands sequentially).

Environment

  • JDK17
  • Two jar packages
    • Jedis
    • fastjson2

Code

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import redis.clients.jedis.Jedis;

import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;

// Delay queue implemented based on Redis
public class RedisDelayingQueue<T> {<!-- -->
    static class TaskItem<T> {<!-- -->
        public String id;
        public T msg;
    }
    // If generics exist when fastjson serializes objects, TypeReference needs to be used
    private Type TaskType = new TypeReference<TaskItem<T>>(){<!-- -->}.getType();

    private Jedis jedis;
    private String queueKey;

    public RedisDelayingQueue(Jedis jedis, String queueKey) {<!-- -->
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    //Add tasks to zset
    //The score is the delay time
    public void delay(T msg) {<!-- -->
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString();
        task.msg = msg;
        // Serialization task
        String s = JSON.toJSONString(task);
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s);
    }

    public void loop() {<!-- -->
        while(!Thread.interrupted()) {<!-- -->
            // Take out a task from zset
            List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
            if(values.isEmpty()) {<!-- -->
                try {<!-- -->
                    Thread.sleep(500);
                } catch(InterruptedException e) {<!-- -->
                    break;
                }
                continue;
            }
            String s = values.iterator().next();
            if(jedis.zrem(queueKey, s) > 0) {<!-- -->
                TaskItem<T> task = JSON.parseObject(s, TaskType);
                this.handleMsg(task.msg);
            }
        }
    }

    public void handleMsg(T msg) {<!-- -->
        System.out.println(msg);
    }
}

Optimization

Through the code in the loop above, when multiple threads obtain values, they may be obtained by multiple threads at the same time, and then the zrem command is called to compete to delete the value, so there will be many useless network requests sent to redis. An easier solution to think of is to make the operation of getting the value and then deleting it atomic. There are two implementation options:

  • By locking code blocks
  • Utilize the characteristics of atomic execution of Lua scripts in redis

Code block locking

This solution is not very good. If a network error or delay occurs between the two commands, other threads will be blocked.

 public void synchronizedLoop() {<!-- -->
        while(!Thread.interrupted()) {<!-- -->
            synchronized(this) {<!-- -->
                // Take out a task from zset
                List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
                if(values.isEmpty()) {<!-- -->
                    try {<!-- -->
                        Thread.sleep(500);
                    } catch(InterruptedException e) {<!-- -->
                        break;
                    }
                    continue;
                }
                String s = values.iterator().next();
                if(jedis.zrem(queueKey, s) > 0) {<!-- -->
                    TaskItem<T> task = JSON.parseObject(s, TaskType);
                    this.handleMsg(task.msg);
                }
            }
        }
    }

Lua script

local key = KEYS[1]
local task = redis.call('ZPOPMIN', key)
if task and next(task) != nil then
  redis.call('ZREM', key, task[1])
  return task[1]
else
  return nil
end

By consulting the documentation, we found that ZRANGEBYSCORE has been outdated since version 6, so ZPOPMIN is used here to obtain the value with the smallest score, which can achieve the same effect.

Through the Jedis eval function, call redis to execute the Lua script command.

 public void luaLoop() {<!-- -->
        while(!Thread.interrupted()) {<!-- -->
            Object ans = jedis.eval(script, 1, queueKey);
            if(ans != null) {<!-- -->
                String task = (String) ans;
                TaskItem<T> taskItem = JSON.parseObject(task, TaskType);
                this.handleMsg(taskItem.msg);
            }else{<!-- -->
                try{<!-- -->
                    Thread.sleep(500);
                }catch(Exception e) {<!-- -->
                    break;
                }
            }
        }
    }

Why you can optimize:

  • Using Lua script, if a thread has a task in zset, it will successfully obtain the task, instead of multiple threads getting the same task at the same time, and then competing for deletion, reducing invalid network IO

Test program

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Main {<!-- -->
    public static void main(String[] args) {<!-- -->
        JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass");
        Jedis jedis = jedisPool.getResource();

        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");

        //Create a thread to act as a producer and store 10 asynchronous tasks in redis
        Thread producer = new Thread() {<!-- -->
            public void run() {<!-- -->
                for (int i = 0; i < 10; i + + ) {<!-- -->
                    queue.delay("codehole" + i);
                }
            }
        };

        //Create a thread to act as a consumer, continuously fetch tasks from redis and execute them
        Thread consumer = new Thread() {<!-- -->
            public void run() {<!-- -->
                queue.luaLoop();
            }
        };

        producer.start();
        consumer.start();
        try {<!-- -->
            // Wait for the producer thread to finish executing
            producer.join();
            Thread.sleep(6000);
            consumer.interrupt();
            consumer.join();
        }catch(InterruptedException e) {<!-- -->
            e.printStackTrace();
        }
    }
}

Some questions

This question is about Jedis, because when I initiate a redis request in the above way, there is actually a concurrency problem. If the delay in the above code is removed, the probability of this problem will be greatly increased, mainly because Jedis is not a thread. Safe, in other words, obtain the redis connection instance through JedisPool, and concurrent access sends data through the same socket.

When used here, it is best to use a Jedis instance for each thread to avoid data competition problems. Only two threads are used here, so simply use two redis instances manually. If there are multiple consumers, , or can each thread hold a Jedis alone to solve the problem.

 private Jedis readJedis;
    private Jedis writeJedis;

Summary

This article records the process of using zset to implement a simple asynchronous queue, and then uses lua or locks to optimize network IO for a problem in the first implementation. Using locks will reduce the concurrency of the program, so it is generally implemented using Lua scripts.