Infinite delay message queue based on Rocket MQ extension

Background:

The delay queue time supported by Rocket MQ is a fixed interval. The default is 19 levels (including level 0): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. Our requirement is to send an order forcing email to the user 48 hours or 72 hours after the user places an order. This function cannot be realized by using the default delayed message, so the scheme has been modified.

Implementation principle:

To put it simply, on the basis of the fixed time interval of the Rocket MQ delay queue, by sending delay messages multiple times, any combination of delay times can be calculated. Through reflection, the call of delay business logic is realized.

The source code is as follows:

/*
 * Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.
 */
package com.example.xxx.utils;

import com.vevor.bmp.crm.common.constants.MQConstants;
import lombok.Data;
import lombok. SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author : Murphy ZhangSun
 * @version: 1.8.0
 * @description : Arbitrary delay time tool based on Rocket MQ
 * @program: user-growth
 * @date : Created in 2023/5/22 3:35 pm
 * @since: 1.8.0
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,
        topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,
        // message consumption order
        consumeMode = ConsumeMode. CONCURRENTLY,
        // The maximum number of repeated message consumption
        maxReconsumeTimes = 3)
public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {

    /**
     * Rocket MQ client
     */
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * MQ default latency level
     */
    private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,
            30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,
            480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};

    @SneakyThrows
    @Override
    public void onMessage(DelayTable<Object> message) {
        Date endTime = message. getEndTime();
        int delayLevel = getDelayLevel(endTime);
        // continue to delay
        if (delayLevel != 0) {
            int currentDelayCount = message. getCurrentDelayCount();
            currentDelayCount++;

            message.setCurrentDelayCount(currentDelayCount);
            message.setCurrentDelayLevel(delayLevel);
            message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);
            this. sendDelayMessage(message);
            return;
        }

        // perform business
        log.info("delay message end! start to process business...");
        Class<? extends DelayMessageHandler> messageHandler = message. getMessageHandler();
        if (messageHandler != null) {
            DelayMessageHandler delayMessageHandler = messageHandler. newInstance();
            delayMessageHandler. handle();
        }
    }

    /**
     * Delay message body
     *
     * @param <E> message type
     */
    @Data

    public static class DelayTable<E> implements Serializable {

        private static final long serialVersionUID = 2405172041950251807L;

        /**
         * Delay message body
         */
        private E content;

        /**
         * message delay end time
         */
        private Date endTime;

        /**
         * total delay in milliseconds
         */
        private long totalDelayTime;

        /**
         * total delay time unit
         */
        private TimeUnit totalDelayTimeUnit;

        /**
         * Current delay times
         */
        private int currentDelayCount;

        /**
         * current latency level
         */
        private int currentDelayLevel;

        /**
         * Current delay in milliseconds
         */
        private long currentDelayMillis;

        /**
         * Delay processing logic
         */
        private Class<? extends DelayMessageHandler> messageHandler;
    }

    /**
     * Send delayed message
     *
     * @param message message body
     * @param delay delay time
     * @param timeUnit delay time unit
     * @param handler After the delay time is up, the logic that needs to be processed
     * @param <E> delay message type
     */
    public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {
        // Convert delay time to timestamp (milliseconds)
        long totalDelayMillis = timeUnit.toMillis(delay);

        // Calculate the end time based on the delay time
        Calendar instance = Calendar. getInstance();
        instance.add(Calendar.MILLISECOND, (int)totalDelayMills);
        Date endTime = instance. getTime();

        // Match the delay level according to the delay time (delay level)
        int delayLevel = getDelayLevel(endTime);
        long delayMillis = TIME_DELAY_LEVEL[delayLevel];

        // Send a message
        DelayTable<E> delayTable = new DelayTable<>();
        // global data
        delayTable.setContent(message);
        delayTable.setMessageHandler(handler);
        delayTable.setEndTime(endTime);
        delayTable.setTotalDelayTime(delay);
        delayTable.setTotalDelayTimeUnit(timeUnit);

        // current latency level data
        delayTable.setCurrentDelayCount(1);
        delayTable.setCurrentDelayLevel(delayLevel);
        delayTable.setCurrentDelayMillis(delayMillis);
        this.sendDelayMessage(delayTable);
    }

    /**
     * Compute latency level
     *
     * @param targetTime delay deadline
     * @return Rocket MQ delay message level
     */
    private static int getDelayLevel(Date targetTime) {
        long currentTime = System. currentTimeMillis();
        long delayMillis = targetTime.getTime() - currentTime;

        if (delayMillis <= 0) {
            // No delay, that is, the delay level is 0
            return 0;
        }

        // Determine which delay level you are in
        // 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h
        for (int i = 1; i <= 18; i ++ ) {
            long delayLevelTime = TIME_DELAY_LEVEL[i];
            if (delayMillis < delayLevelTime) {
                return i - 1;
            } else if (delayMillis == delayLevelTime) {
                return i;
            }
        }

        // Maximum delay level is 18
        return 18;
    }

    /**
     * Send delayed message
     *
     * @param delayTable delay object, can be recycled
     */
    @SneakyThrows
    private <E> void sendDelayMessage(DelayTable<E> delayTable) {
        // message serialization
        Message<DelayTable<E>> message = MessageBuilder
                        .withPayload(delayTable)
                        .build();

        // set\send delay message
        int delayLevel = delayTable. getCurrentDelayLevel();
        rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message
                , 3000, delayLevel);

        log.debug("delay count: {}, delay level: {}, time: {} milliseconds",
                delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);
    }

    /**
     * Delay callback interface
     *
     * The callback logic must implement the #hander() method of this interface, which will be called by reflection after the delay is over
     */
    public interface DelayMessageHandler extends Serializable {
        long serialVersionUID = 2405172041950251807L;

        /**
         * Callback
         */
        void handle();
    }

}

Test code:

/*
 * Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.
 */
package com.vevor.bmp.crm.io.controller;

import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils;
import com.vevor.common.pojo.vo.ResponseResult;
import lombok.Data;
import lombok. SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @author : Murphy ZhangSun
 * @version: 1.8.0
 * @description: delay queue test
 * @program: user-growth
 * @date : Created in 2023/5/22 4:54 pm
 * @since: 1.8.0
 */
@Slf4j
@RestController
public class DelayQueueController {

    @Resource
    private RocketMQDelayQueueUtils rocketMQDelayQueueUtils;
 
    @GetMapping("/mq/delay")
    @SneakyThrows
    public ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {
        // get delay queue
        rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);
        return ResponseResult. success();
    }

    /**
     * @author : Murphy ZhangSun
     * @version:
     * @description:
     * @program: user-growth
     * @date : Created in 2023/5/23 2:11 pm
     * @since:
     */
    @Data
    public static class CallBack implements RocketMQDelayQueueUtils. DelayMessageHandler {

        /**
         * Callback
         */
        @Override
        public void handle() {
            log.info("i am business logical! {}", System.currentTimeMillis());
        }
    }
}

Pros and cons:

Advantages: Compared with the scheduled task framework, the method of delaying messages has the advantages of high real-time performance, support for distributed, lightweight, high concurrency, etc.

Disadvantages: The accuracy of the message is not reliable. Under normal circumstances, the accuracy is at the second level, but when the MQ service has a message accumulation, the time of the message will deviate greatly, so the accuracy depends on the stability of the MQ service.