Background:
The delay queue time supported by Rocket MQ is a fixed interval. The default is 19 levels (including level 0): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. Our requirement is to send an order forcing email to the user 48 hours or 72 hours after the user places an order. This function cannot be realized by using the default delayed message, so the scheme has been modified.
Implementation principle:
To put it simply, on the basis of the fixed time interval of the Rocket MQ delay queue, by sending delay messages multiple times, any combination of delay times can be calculated. Through reflection, the call of delay business logic is realized.
The source code is as follows:
/* * Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved. */ package com.example.xxx.utils; import com.vevor.bmp.crm.common.constants.MQConstants; import lombok.Data; import lombok. SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.Serializable; import java.util.Calendar; import java.util.Date; import java.util.concurrent.TimeUnit; /** * @author : Murphy ZhangSun * @version: 1.8.0 * @description : Arbitrary delay time tool based on Rocket MQ * @program: user-growth * @date : Created in 2023/5/22 3:35 pm * @since: 1.8.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP, topic = MQConstants.CRM_DELAY_QUEUE_TOPIC, // message consumption order consumeMode = ConsumeMode. CONCURRENTLY, // The maximum number of repeated message consumption maxReconsumeTimes = 3) public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> { /** * Rocket MQ client */ @Resource private RocketMQTemplate rocketMQTemplate; /** * MQ default latency level */ private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L, 30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L, 480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L}; @SneakyThrows @Override public void onMessage(DelayTable<Object> message) { Date endTime = message. getEndTime(); int delayLevel = getDelayLevel(endTime); // continue to delay if (delayLevel != 0) { int currentDelayCount = message. getCurrentDelayCount(); currentDelayCount++; message.setCurrentDelayCount(currentDelayCount); message.setCurrentDelayLevel(delayLevel); message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]); this. sendDelayMessage(message); return; } // perform business log.info("delay message end! start to process business..."); Class<? extends DelayMessageHandler> messageHandler = message. getMessageHandler(); if (messageHandler != null) { DelayMessageHandler delayMessageHandler = messageHandler. newInstance(); delayMessageHandler. handle(); } } /** * Delay message body * * @param <E> message type */ @Data public static class DelayTable<E> implements Serializable { private static final long serialVersionUID = 2405172041950251807L; /** * Delay message body */ private E content; /** * message delay end time */ private Date endTime; /** * total delay in milliseconds */ private long totalDelayTime; /** * total delay time unit */ private TimeUnit totalDelayTimeUnit; /** * Current delay times */ private int currentDelayCount; /** * current latency level */ private int currentDelayLevel; /** * Current delay in milliseconds */ private long currentDelayMillis; /** * Delay processing logic */ private Class<? extends DelayMessageHandler> messageHandler; } /** * Send delayed message * * @param message message body * @param delay delay time * @param timeUnit delay time unit * @param handler After the delay time is up, the logic that needs to be processed * @param <E> delay message type */ public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) { // Convert delay time to timestamp (milliseconds) long totalDelayMillis = timeUnit.toMillis(delay); // Calculate the end time based on the delay time Calendar instance = Calendar. getInstance(); instance.add(Calendar.MILLISECOND, (int)totalDelayMills); Date endTime = instance. getTime(); // Match the delay level according to the delay time (delay level) int delayLevel = getDelayLevel(endTime); long delayMillis = TIME_DELAY_LEVEL[delayLevel]; // Send a message DelayTable<E> delayTable = new DelayTable<>(); // global data delayTable.setContent(message); delayTable.setMessageHandler(handler); delayTable.setEndTime(endTime); delayTable.setTotalDelayTime(delay); delayTable.setTotalDelayTimeUnit(timeUnit); // current latency level data delayTable.setCurrentDelayCount(1); delayTable.setCurrentDelayLevel(delayLevel); delayTable.setCurrentDelayMillis(delayMillis); this.sendDelayMessage(delayTable); } /** * Compute latency level * * @param targetTime delay deadline * @return Rocket MQ delay message level */ private static int getDelayLevel(Date targetTime) { long currentTime = System. currentTimeMillis(); long delayMillis = targetTime.getTime() - currentTime; if (delayMillis <= 0) { // No delay, that is, the delay level is 0 return 0; } // Determine which delay level you are in // 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h for (int i = 1; i <= 18; i ++ ) { long delayLevelTime = TIME_DELAY_LEVEL[i]; if (delayMillis < delayLevelTime) { return i - 1; } else if (delayMillis == delayLevelTime) { return i; } } // Maximum delay level is 18 return 18; } /** * Send delayed message * * @param delayTable delay object, can be recycled */ @SneakyThrows private <E> void sendDelayMessage(DelayTable<E> delayTable) { // message serialization Message<DelayTable<E>> message = MessageBuilder .withPayload(delayTable) .build(); // set\send delay message int delayLevel = delayTable. getCurrentDelayLevel(); rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message , 3000, delayLevel); log.debug("delay count: {}, delay level: {}, time: {} milliseconds", delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]); } /** * Delay callback interface * * The callback logic must implement the #hander() method of this interface, which will be called by reflection after the delay is over */ public interface DelayMessageHandler extends Serializable { long serialVersionUID = 2405172041950251807L; /** * Callback */ void handle(); } }
Test code:
/* * Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved. */ package com.vevor.bmp.crm.io.controller; import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils; import com.vevor.common.pojo.vo.ResponseResult; import lombok.Data; import lombok. SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** * @author : Murphy ZhangSun * @version: 1.8.0 * @description: delay queue test * @program: user-growth * @date : Created in 2023/5/22 4:54 pm * @since: 1.8.0 */ @Slf4j @RestController public class DelayQueueController { @Resource private RocketMQDelayQueueUtils rocketMQDelayQueueUtils; @GetMapping("/mq/delay") @SneakyThrows public ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) { // get delay queue rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class); return ResponseResult. success(); } /** * @author : Murphy ZhangSun * @version: * @description: * @program: user-growth * @date : Created in 2023/5/23 2:11 pm * @since: */ @Data public static class CallBack implements RocketMQDelayQueueUtils. DelayMessageHandler { /** * Callback */ @Override public void handle() { log.info("i am business logical! {}", System.currentTimeMillis()); } } }
Pros and cons:
Advantages: Compared with the scheduled task framework, the method of delaying messages has the advantages of high real-time performance, support for distributed, lightweight, high concurrency, etc.
Disadvantages: The accuracy of the message is not reliable. Under normal circumstances, the accuracy is at the second level, but when the MQ service has a message accumulation, the time of the message will deviate greatly, so the accuracy depends on the stability of the MQ service.