Redis implements delay queue

Use Redis to implement delay queue

Implementation ideas

As a high-performance NoSQL database, redis has the characteristics of fast reading and writing, high concurrency, and data persistence. It is very suitable for implementing delay queues. Redis provides a rich data structure.
A simple delay queue can be implemented using the ZSET set (ordered set) data structure of redis.

Each element in the zset data structure of redis has a score and a value. We can use the execution timestamp of the task as the score.
Use task data as value and insert the task into zset. Each task has a unique id (such as order id) and task execution time (such as 30min).
Information body such as task content (such as automatic cancellation of order overtime and payment system). Then start another thread, which will periodically take the minimum score from zset
(i.e. the earliest task to be executed), if the score of the task is less than the current timestamp, execute the task, otherwise wait for a period of time and check again.
Until the task can be executed, after executing the task, delete the successfully executed task through the Redis remove command.

Detailed steps

This article will introduce how to use Redis’s Sorted Set data structure to implement a delay queue and provide a complete sample code. At the same time, we will also provide corresponding test cases and test results.
Let me first summarize to the students how to use Redis to implement some implementation steps of delay queue for the Spring Boot project?

  1. Introduce related dependencies (integrate redis)
 <!--Integrated redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
  1. Configure redis
#redis configuration
Spring:
  redis:
    database: 0 #Redis database index (default is 0)
    host: 127.0.0.1 #redis server ip, since I am building it locally, it points to the local ip
    port: 6379 #redis server connection port
    password: #redis server connection password (default is empty)
    # Connection pool configuration
    jedis.pool:
      max-active: 20 #Maximum number of connections in the connection pool (use negative values to indicate no limit)
      max-wait: -1 #The maximum blocking waiting time of the connection pool (use a negative value to indicate no limit)
      max-idle: 10 #Maximum idle connection in the connection pool
      min-idle: 0 #Minimum idle connection in the connection pool
      timeout: 1000 #Connection timeout (milliseconds). I set it to 1 second

  1. Create message class DelayMessage
  • A message class is defined here, including the message id, message content, and expiration time (message execution time). The code is as follows
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DelayMessage implements Serializable {<!-- -->

    /**
     * Remember to instantiate
     */
    private static final long serialVersionUID = -7671756385477179547L;

    /**
     * message id
     */
    private String id;

    /**
     * Message content
     */
    private String content;

    /**
     * Message expiration time
     */
    private long expireTime;

}
  1. Create a delay queue class DelayQueue
  • Create a delay queue class that provides methods for adding messages, deleting messages, and obtaining messages. The specific code is as follows
@Component
public class DelayQueue {<!-- -->

    /**
     * The intranet IP of the current machine is spliced after the key: used to distinguish clusters and solve concurrency problems in clusters.
     */
    private static final String KEY = "delay_queue:" + getHostAddress();

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * Add messages to the delay queue
     */
    public void put(DelayMessage message) {<!-- -->
        redisTemplate.opsForZSet().add(KEY, message, message.getExpireTime());
    }

    /**
     * Delete messages from the delay queue
     */
    public Long remove(DelayMessage message) {<!-- -->
        Long remove = redisTemplate.opsForZSet().remove(KEY, message);
        return remove;
    }

    /**
     * Get expired messages in the delay queue
     */
    public List<DelayMessage> getExpiredMessages() {<!-- -->
// 1: Get the start time
        long minScore = 0;
// 2: Get the end time
        long maxScore = System.currentTimeMillis();
// 3: Get the data list in the specified range
        Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(KEY, minScore, maxScore);
        if (messages == null || messages.isEmpty()) {<!-- -->
            return Collections.emptyList();
        }
// 4: Encapsulate the object and return
        List<DelayMessage> result = new ArrayList<>();
        for (Object message : messages) {<!-- -->
            DelayMessage delayMessage = JSONObject.parseObject(message.toString(), DelayMessage.class);
            result.add(delayMessage);
        }
        return result;
    }

    /**
     * Get the address (intranet address of the server) (intranet ip)
     *
     * @return
     */
    public static String getHostAddress() {<!-- -->
        InetAddress localHost = null;
        try {<!-- -->
            localHost = InetAddress.getLocalHost();
        } catch (
                UnknownHostException e) {<!-- -->
            e.printStackTrace();
        }
        return localHost.getHostAddress();
    }
}
  1. Create DelayMessageHandler message processing class
  • Create a message processing task, add a message processing expired message, write a scheduled task, and poll the expired tasks in the delay queue at intervals of 1s. If it cannot be obtained, it will be empty.
    Then the message processing logic will not be performed, otherwise polling will continue.
@Component
public class DelayMessageHandler {<!-- -->
    
    public static SimpleDateFormat dateTimeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private DelayQueue delayQueue;

    /**
     * Process expired messages (polling)
     */
    @Scheduled(fixedDelay = 1000)
    public void handleExpiredMessages() {<!-- -->
        String currentTime = getCurrentTime();
// 1: Scan tasks and add tasks that need to be executed to the task queue
        List<DelayMessage> messages = delayQueue.getExpiredMessages();
        List<DelayMessage> messages_2 = delayQueue.getExpiredMessages();
        System.out.println(currentTime + "Number of messages to be processed: " + messages.size());
// 2: Start processing messages
        if (!messages.isEmpty()) {<!-- -->
            for (DelayMessage message : messages) {<!-- -->
                    System.out.println(message.getId() + " --> Message starts processing");
                    try {<!-- -->
// 2.1.1: Simulate sleep for 3 seconds, task processing time (actually it may be longer)
                        Thread.sleep(3000);
                    } catch (Exception e) {<!-- -->
                        e.printStackTrace();
                    }
                    System.out.println(message.getId() + " --> Message processing ends");
// 2.2: After processing the message, delete the message
                    delayQueue.remove(message);
            }
        }
    }

    /**
     * Get the current hour, minute and second
     *
     * @return
     */
    public static String getCurrentTime() {<!-- -->
        String format = dateTimeFormater.format(new Date());
        return format;
    }
}

Execution result: (We can see that the message is slowly being consumed)

2023-11-03 15:06:01 Number of messages to be processed: 0
2023-11-03 15:06:02 Number of messages to be processed: 0
2023-11-03 15:06:03 Number of messages to be processed: 0
2023-11-03 15:06:04 Number of messages to be processed: 0
# Start calling the interface here and add messages to the delay queue
2023-11-03 15:06:05 Number of messages to be processed: 4
2023-11-03 15:06:05 :1 --> Message processing begins
2023-11-03 15:06:05 :1 --> Message processing ends
2023-11-03 15:06:05 :13 --> Message processing begins
2023-11-03 15:06:05 :13 --> Message processing ends
2023-11-03 15:06:05 :5 --> Message processing begins
2023-11-03 15:06:05 :5 --> Message processing ends
2023-11-03 15:06:05 :9 --> Message processing begins
2023-11-03 15:06:05 :9 --> Message processing ends
2023-11-03 15:06:18 Number of messages to be processed: 12
2023-11-03 15:06:18 :10 --> Message processing begins
2023-11-03 15:06:18 :10 --> Message processing ends
2023-11-03 15:06:18 :14 --> Message processing begins
2023-11-03 15:06:18 :14 --> Message processing ends
2023-11-03 15:06:18 :2 --> Message processing begins
2023-11-03 15:06:18 :2 --> Message processing ends
2023-11-03 15:06:18 :6 --> Message processing begins

We will find a problem here. The @Scheduled annotation is executed in polling. If the previous task is not completed, the timer will wait for the last execution to be completed.
In other words, if the @Scheduled annotation indicates asynchronous execution, then a problem will arise. Each message processing will take 3 seconds.
Assume that there are two messages A and B, and the expiration time of the messages is the same, then the two messages will be taken out of the cache at the same time and prepared for consumption. Assume that message A starts to be consumed first.
Then for message B, you have to wait 3 seconds until the execution of message A is completed before starting to consume message B. Then there will be message accumulation and delayed consumption. The message that was supposed to be consumed at 14:00 will not be consumed until 14:10. Start consumption (maybe later),
If the message volume is large enough, problems will occur, such as memory leaks, message accumulation, delayed consumption, etc.

Solution: Open a thread for execution (use thread pool), use the following code, when we consume a message, we need to create a thread to consume in the background, which will solve the above problem.
(A thread pool is needed here. To be lazy, I simply simulated it)

 /**
     * Process expired messages (polling)
     */
    @Scheduled(fixedDelay = 1000)
    public void handleExpiredMessages() {<!-- -->
        String currentTime = getCurrentTime();
// 1: Scan tasks and add tasks that need to be executed to the task queue
        List<DelayMessage> messages = delayQueue.getExpiredMessages();
        System.out.println(currentTime + "Number of messages to be processed: " + messages.size());
// 2: Start processing messages
        if (!messages.isEmpty()) {<!-- -->
            for (DelayMessage message : messages) {<!-- -->
// 2.1: Enable threads to process messages asynchronously: Do not let the time of processing messages block the current thread
                new Thread(() -> {<!-- -->
                    System.out.println(currentTime + " :" + message.getId() + " --> Message starts processing");
                    try {<!-- -->
// 2.1.1: Simulate sleep for 3 seconds, task processing time (actually it may be longer)
                        Thread.sleep(3000);
                    } catch (Exception e) {<!-- -->
                        e.printStackTrace();
                    }
                    System.out.println(currentTime + " :" + message.getId() + " --> Message processing ends");
// 2.2: After processing the message, delete the message
                    delayQueue.remove(message);
                }).start();
            }
        }
    }

Execution result: Start thread asynchronous execution message

2023-11-03 15:18:33 Number of messages to be processed: 0
2023-11-03 15:18:34 Number of messages to be processed: 0
2023-11-03 15:18:35 Number of messages to be processed: 0
2023-11-03 15:18:36 Number of messages to be processed: 4
2023-11-03 15:18:36 :1 --> Message processing begins
2023-11-03 15:18:36 :13 --> Message processing begins
2023-11-03 15:18:36 :5 --> Message processing begins
2023-11-03 15:18:36 :9 --> Message processing begins
2023-11-03 15:18:37 Number of messages to be processed: 4
2023-11-03 15:18:37 :1 --> Message processing begins // Note: (This message has been consumed repeatedly)
2023-11-03 15:18:37 :13 --> Message processing begins
2023-11-03 15:18:37 :5 --> Message processing begins
2023-11-03 15:18:37 :9 --> Message processing begins
2023-11-03 15:18:38 Number of messages to be processed: 8
2023-11-03 15:18:38 :1 --> Message processing begins
2023-11-03 15:18:38 :5 --> Message processing begins
2023-11-03 15:18:38 :9 --> Message processing begins
2023-11-03 15:18:38 :13 --> Message processing begins
2023-11-03 15:18:38 :10 --> Message processing begins
2023-11-03 15:18:38 :6 --> Message processing begins
2023-11-03 15:18:38 :2 --> Message processing begins
2023-11-03 15:18:38 :14 --> Message processing begins
2023-11-03 15:18:36 :9 --> Message processing ends
2023-11-03 15:18:36 :5 --> Message processing ends
2023-11-03 15:18:36 :1 --> Message processing ends
2023-11-03 15:18:36 :13 --> Message processing ends

We used the method of starting a new thread to consume messages. The problem of message delay was solved, but a new problem appeared, and messages would be consumed repeatedly.

The cause of the problem: The first time we timed it, we took out 4 expired messages that met the conditions. We started 4 threads to execute them. When the second second passed, we got another message that met the conditions.
Because it takes time to execute the message obtained for the first time, when we get the message for the second time, we may also take out the four messages from the first time, and then open a thread to consume again, which will cause repeated consumption. situation

solution :

The reason for this problem is that the current thread does not know that this message is already being processed by other threads. As long as this problem is solved,
The current thread starts processing this message. It first determines whether the current message has been processed by other threads. If it is being processed, it will not be processed. If it is not processed, it will start processing.

We know that the remove() method of redis to delete elements has a return value indicating the status of deletion.
We can remove() the message before message processing. If remove() succeeds, it means that the current message has not been consumed. If remove() fails, it means that the message has been consumed.

 /**
     * Process expired messages (polling)
     */
    @Scheduled(fixedDelay = 1000)
    public void handleExpiredMessages() {<!-- -->
        String currentTime = getCurrentTime();
// 1: Scan tasks and add tasks that need to be executed to the task queue
        List<DelayMessage> messages = delayQueue.getExpiredMessages();
        System.out.println(currentTime + "Number of messages to be processed: " + messages.size());
// 2: Start processing messages
        if (!messages.isEmpty()) {<!-- -->
            for (DelayMessage message : messages) {<!-- -->
// 2.1: Process the message: delete the message first and get whether the current message has been consumed by others
                Long remove = delayQueue.remove(message);
                if (remove > 0) {<!-- -->
// 2.2: Enable threads to process messages asynchronously: Do not let the time of processing messages block the current thread
                    new Thread(() -> {<!-- -->
                        System.out.println(currentTime + " :" + message.getId() + " --> Message starts processing");
                        try {<!-- -->
// 2.1.1: Simulate sleep for 3 seconds, task processing time (actually it may be longer)
                            Thread.sleep(3000);
                        } catch (Exception e) {<!-- -->
                            e.printStackTrace();
                        }
                        System.out.println(currentTime + " :" + message.getId() + " --> Message processing ends");
                    }).start();
                }
            }
        }
    }
  • Execution results: We will find that the problem of repeated consumption is solved
2023-11-03 15:31:36 Number of messages to be processed: 4
2023-11-03 15:31:36 :1 --> Message processing begins
2023-11-03 15:31:36 :13 --> Message processing begins
2023-11-03 15:31:36 :5 --> Message processing begins
2023-11-03 15:31:36 :9 --> Message processing begins
2023-11-03 15:31:37 Number of messages to be processed: 0
2023-11-03 15:31:38 Number of messages to be processed: 4
2023-11-03 15:31:38 :10 --> Message processing begins
2023-11-03 15:31:38 :14 --> Message processing begins
2023-11-03 15:31:38 :2 --> Message processing begins
2023-11-03 15:31:38 :6 --> Message processing begins
2023-11-03 15:31:36 :9 --> Message processing ends
2023-11-03 15:31:36 :5 --> Message processing ends
2023-11-03 15:31:36 :13 --> Message processing ends
2023-11-03 15:31:36 :1 --> Message processing ends
2023-11-03 15:31:39 Number of messages to be processed: 0
2023-11-03 15:31:40 Number of messages to be processed: 0
2023-11-03 15:31:38 :10 --> Message processing ends
2023-11-03 15:31:38 :2 --> Message processing ends
2023-11-03 15:31:38 :6 --> Message processing ends
2023-11-03 15:31:38 :14 --> Message processing ends
2023-11-03 15:31:41 Number of messages to be processed: 4
2023-11-03 15:31:41 :11 --> Message processing begins
2023-11-03 15:31:41 :15 --> Message processing begins
2023-11-03 15:31:41 :3 --> Message processing begins
2023-11-03 15:31:41 :7 --> Message processing begins
2023-11-03 15:31:42 Number of messages to be processed: 0
2023-11-03 15:31:43 Number of messages to be processed: 0
2023-11-03 15:31:41 :7 --> Message processing ends
2023-11-03 15:31:41 :11 --> Message processing ends
2023-11-03 15:31:41 :3 --> Message processing ends
2023-11-03 15:31:41 :15 --> Message processing ends

But there will still be problems. If the service is restarted or the service goes down, the messages currently being executed will be lost the next time the service is started.

The solution I gave is: create a temporary data table. When the message starts to be consumed, add a record to the table. When the message is consumed successfully, delete the record in the temporary table.
When the service is restarted, the records in the temporary table are read into the delay queue, which solves the problem of message loss.

key point

  1. Using the cached key with intranet IP solves all the problems that may arise in clusters and multiple machines.
  2. Using background threads and thread pools solves the problems of message accumulation and delayed consumption.
  3. Use the method of deleting the key first to solve the problem of repeated consumption of messages.
  4. Persistence of the currently processed message solves the problem of message loss.

This is just the solution I gave, and it is not perfect. If you want to implement a message queue, it is best to use RabbitMQ, RocketMQ, ActiveMQ, Kafka, ZeroMQ, MetaMq, etc.