11 Ways to Realize Automatic Order Cancellation (Part 2)

1. Monitor Redis expired key

In Redis, there is a publish-subscribe mechanism

The producer needs to specify which channel to send the message to when sending the message, and the consumer can get the message by subscribing to this channel. The channel in the figure is understood as a topic in MQ.

And in Redis, there are many default channels, but the producer who sends messages to these channels is not the code we write, but Redis itself. There is such a channel called __keyevent@__:expired, and db refers to the serial number of the Redis database.

When a Redis key expires, Redis will post an event to the __keyevent@__:expired channel. As long as you listen to this event, you can get the expired key.

Therefore, the principle of delaying tasks based on monitoring Redis expired keys is as follows:

  • Use the delayed task as the key, and set the expiration time as the delayed time
  • Listen to the channel __keyevent@__:expired, then once the delayed task reaches the expiration time (delay time), you can get this task

1. Come to a demo

Spring has implemented the function of listening to the channel __keyevent@*__:expired. The * in __keyevent@*__:expired represents a wildcard and listens to all databases.

So the demo is very simple to write, just 4 steps

rely

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

configuration file

spring:
  redis:
    host: 192.168.200.144
    port: 6379

configuration class

@Configuration
public class RedisConfiguration {<!-- -->

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {<!-- -->
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {<!-- -->
        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
    }
}

KeyExpirationEventMessageListener implements monitoring of __keyevent@*__:expiredchannel

When the KeyExpirationEventMessageListener receives the message of the expired Key issued by Redis, it will publish the RedisKeyExpiredEvent event

So we only need to listen to the RedisKeyExpiredEvent event to get the Key of the expired message, that is, the delayed message.

Implement MyRedisKeyExpiredEventListener to monitor the RedisKeyExpiredEvent event

@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {<!-- -->

    @Override
    public void onApplicationEvent(RedisKeyExpiredEvent event) {<!-- -->
        byte[] body = event. getSource();
        System.out.println("Get delayed message:" + new String(body));
    }

}

After writing the code, start the application

After that, I set the message directly through the Redis command, and did not send the message through the code. The key of the message is sanyou, the value is task, the value is not important, and the expiration time is 5s

set sanyou task
expire sanyou 5

Successfully obtained the delayed task

Although this method can implement delayed tasks, there are many pitfalls in this method

Task is delayed

The release of Redis expiration events does not mean that the key will be released when the expiration time is reached, but the event will be released after the key is cleared after the expiration time.

The two clearing strategies for Redis expired keys are the two commonly memorized in interview stereotypes:
Lazy cleanup. When the key expires, the Key will be cleared when accessing
Clear regularly. The background will periodically check some keys, and if any key expires, it will be cleared

So even if the key expires, Redis may not necessarily send the key expiration event, which leads to the fact that even if the delayed task reaches the delayed time, the delayed task may not be obtained.

lost messages too often

The publish-subscribe mode implemented by Redis has no persistence mechanism for messages. When a message is published to a channel, if no client subscribes to the channel, the message will be lost and will not be persisted like MQ. It will be consumed by consumers when they subscribe.

So, assuming that during the service restart, a producer or Redis itself publishes a message to a certain channel, and the channel is not monitored due to the service restart, then the message will naturally be lost.

Message consumption only has broadcast mode

Redis’s publish-subscribe mode message consumption has only one broadcast mode.

The so-called broadcast mode is that multiple consumers subscribe to the same channel, so each consumer can consume all the messages published to this channel.

As shown in the figure, the producer publishes a message with the content of sanyou, then both consumers can receive the message of sanyou at the same time.

Therefore, if you obtain delayed tasks by monitoring the channel, once there are multiple service instances, you must ensure that messages cannot be processed repeatedly, which additionally increases the amount of code development.

A certain event of all keys is received

This is not a problem of the Redis publish-subscribe model, but a problem of Redis’s own event notification.

When the __keyevent@__:expired channel is monitored, all Redis keys will be notified to the consumer as long as the expiration event occurs, regardless of whether the key is what the consumer wants to receive.

So if you only want to consume the key of a certain type of message, you have to add some tags yourself, such as adding a prefix to the key of the message. When consuming, judge that the key with the prefix is the task that needs to be consumed.

2. RDelayedQueue of Redisson

Redisson, the son of Redis (Redis son), implements a lot of functions based on Redis, among which the most commonly used is the implementation of Redis distributed locks, but in addition to implementing Redis distributed locks, it also implements the function of delay queues.

1. Come to a demo first

import pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency>

Encapsulates a RedissonDelayQueue class

@Component
@Slf4j
public class RedissonDelayQueue {<!-- -->

    private RedissonClient redissonClient;

    private RDelayedQueue<String> delayQueue;
    private RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {<!-- -->
        initDelayQueue();
        startDelayQueueConsumer();
    }

    private void initDelayQueue() {<!-- -->
        Config config = new Config();
        SingleServerConfig serverConfig = config. useSingleServer();
        serverConfig.setAddress("redis://localhost:6379");
        redissonClient = Redisson. create(config);

        blockingQueue = redissonClient.getBlockingQueue("SANYOU");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
    }

    private void startDelayQueueConsumer() {<!-- -->
        new Thread(() -> {<!-- -->
            while (true) {<!-- -->
                try {<!-- -->
                    String task = blockingQueue. take();
                    log.info("Delayed task received: {}", task);
                } catch (Exception e) {<!-- -->
                    e.printStackTrace();
                }
            }
        }, "SANYOU-Consumer").start();
    }

    public void offerTask(String task, long seconds) {<!-- -->
        log.info("Add delay task: {} delay time: {}s", task, seconds);
        delayQueue. offer(task, seconds, TimeUnit. SECONDS);
    }

}

When this class is created, it will initialize the delay queue, create a RedissonClient object, and then obtain the RDelayedQueue and RBlockingQueue objects through the RedissonClient object. The name of the incoming queue is SANYOU, which does not matter.

After the delay queue is created, a delayed task consumer thread will be opened, and this thread will always block and obtain delayed tasks from the RBlockingQueue through the take method.

When adding a task, it is added through the offer method of RDelayedQueue.

Controller class, add tasks through the interface, the delay time is 5s

@RestController
public class RedissonDelayQueueController {<!-- -->

    @Resource
    private RedissonDelayQueue redissonDelayQueue;

    @GetMapping("/add")
    public void addTask(@RequestParam("task") String task) {<!-- -->
        redissonDelayQueue.offerTask(task, 5);
    }

}

Start the project, enter the following connection in the browser, and add tasks: http://localhost:8080/add?task=sanyou
Wait quietly for 5 seconds, and successfully get the task.

2. Implementation principle

The following is the implementation principle of Redisson delay queue

The prefix in front of SANYOU is fixed, and Redisson will add the prefix when creating it.

  • redisson_delay_queue_timeout: SANYOU, sorted set data type, stores all delayed tasks, sorted according to the expiration timestamp of the delayed task (timestamp when the task is submitted + delay time), so the first element in the front of the list is the earliest task to be executed in the entire delay queue, this concept is very important
  • redisson_delay_queue: SANYOU, the list data type, also stores all the tasks, but it seems useless after research. .
  • SANYOU, the list data type, is called the target queue. The tasks stored in this queue are tasks that have reached the delay time and can be acquired by consumers. Therefore, the take method of RBlockingQueue in the above demo obtains tasks from this target queue.
  • redisson_delay_queue_channel: SANYOU, is a channel used to notify the client to start a delayed task

When the task is submitted, Redisson will put the task in redisson_delay_queue_timeout:SANYOU, and the score is the timestamp of the submitted task + the delay time, which is the expiration timestamp of the delayed task

The Redisson client internally submits a delayed task by listening to the redisson_delay_queue_channel: SANYOU channel. This delayed task can ensure that the tasks in redisson_delay_queue_timeout: SANYOU that have reached the delay time are removed from redisson_delay_queue_timeout: SANYOU and stored in the target queue of SANYOU.

So consumers can get delayed tasks from the target queue of SANYOU.

So it can be seen from this that the implementation of Redisson’s delayed task is the same as the implementation of MQ mentioned above. At first, the task is placed in a middle place, called redisson_delay_queue_timeout: SANYOU, and then a thing similar to a timing task will be opened to judge whether the message in this middle place has reached the delay time. When it arrives, it will be placed in the final target queue for consumers to consume.

This implementation of Redisson is more reliable than the implementation of monitoring Redis expired keys, because messages are stored in list and sorted set data types, so messages are rarely lost.

3. Netty’s HashedWheelTimer

1. Come to a demo first

@Slf4j
public class NettyHashedWheelTimerDemo {<!-- -->

    public static void main(String[] args) {<!-- -->
        HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit. MILLISECONDS, 8);
        timer. start();

        log.info("submit delayed task");
        timer.newTimeout(timeout -> log.info("Execute delayed task"), 5, TimeUnit.SECONDS);
    }

}

Test Results

2. Implementation principle


As shown in the figure, the time wheel will be divided into many grids (the 8 in the above demo represents 8 grids), and one grid represents a period of time (100 in the above demo means that a grid is 100ms), so in the above demo, it will go around every 800ms.

When the task is submitted, it will perform hash modulo according to the due time of the task, calculate the specific grid where the execution time of the task is, and then add it to this grid. If there are multiple tasks in this grid, it will be saved with a linked list. So the addition of this task is a bit like the principle of HashMap storing elements.

HashedWheelTimer will open a thread internally, poll each grid, find the task with the delay time, and then execute it.

Since HashedWheelTimer is also single-threaded to process tasks, like Timer, long-running tasks will cause delayed processing of other tasks.

The client delay task mentioned above in Redisson is implemented based on Netty’s HashedWheelTimer.

4. Hutool’s SystemTimer

The Hutool tool class also provides the implementation of delayed tasks SystemTimer

1. Come to a demo first

@Slf4j
public class SystemTimerDemo {<!-- -->

    public static void main(String[] args) {<!-- -->
        SystemTimer systemTimer = new SystemTimer();
        systemTimer.start();

        log.info("submit delayed task");
        systemTimer.addTask(new TimerTask(() -> log.info("Execute delayed task"), 5000));
    }

}

Results of the

The bottom layer of Hutool actually uses the time wheel.

5. Qurtaz

Qurtaz is an open source job scheduling framework. Based on the API provided by Qurtaz, it can also implement the function of delaying tasks.

1. Come to a Demo

rely

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>

SanYouJob implements the Job interface. When the task reaches the execution time, it will call the implementation of execute, and the content of the task can be obtained from the context

@Slf4j
public class SanYouJob implements Job {<!-- -->
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {<!-- -->
        JobDetail jobDetail = context. getJobDetail();
        JobDataMap jobDataMap = jobDetail. getJobDataMap();
        log.info("Get delayed task: {}", jobDataMap.get("delayTask"));
    }
}

test class

public class QuartzDemo {<!-- -->

    public static void main(String[] args) throws SchedulerException, InterruptedException {<!-- -->
        // 1. Create a Scheduler factory
        SchedulerFactory sf = new StdSchedulerFactory();
        // 2. Get the scheduler instance from the factory
        Scheduler scheduler = sf. getScheduler();

        // 6. Start the scheduler
        scheduler. start();

        // 3. Create JobDetail, the Job type is the SanYouJob mentioned above
        JobDetail jb = JobBuilder. newJob(SanYouJob. class)
                .usingJobData("delayTask", "This is a delayed task")
                .build();

        // 4. Create Trigger
        Trigger t = TriggerBuilder. newTrigger()
                //The trigger time of the task is the delay time of the delayed task arrival
                .startAt(DateUtil.offsetSecond(new Date(), 5))
                .build();

        // 5. Register tasks and timers
        log.info("submit delayed task");
        scheduler. scheduleJob(jb, t);
    }
}

Results of the:

2. Implementation principle

core components

  • Job: Represents a task, and the implementation of the execute method is the execution logic of the task
  • JobDetail: The details of the task, you can set the parameters required by the task and other information
  • Trigger: A trigger is used to trigger the execution of the business. For example, if the task is triggered after 5 seconds, the task will be triggered after 5 seconds.
  • Scheduler: The scheduler, which can register multiple tasks and triggers for corresponding tasks, and then schedule the execution of tasks

    When starting, a QuartzSchedulerThread scheduling thread will be opened. This thread will judge whether the execution time of the task is reached, and if it is, the task will be handed over to the task thread pool for execution.

6. Infinite polling delay task

Infinite polling means to start a thread to continuously poll tasks, and when these tasks reach the delay time, then execute the tasks.

1. Come to a demo

@Slf4j
public class PollingTaskDemo {<!-- -->

    private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {<!-- -->
        new Thread(() -> {<!-- -->
            while (true) {<!-- -->
                try {<!-- -->
                    for (DelayTask delayTask : DELAY_TASK_LIST) {<!-- -->
                        if (delayTask. triggerTime <= System. currentTimeMillis()) {<!-- -->
                            log.info("Delay task processing: {}", delayTask.taskContent);
                            DELAY_TASK_LIST. remove(delayTask);
                        }
                    }
                    TimeUnit. MILLISECONDS. sleep(100);
                } catch (Exception e) {<!-- -->
                }
            }
        }).start();

        log.info("submit delayed task");
        DELAY_TASK_LIST.add(new DelayTask("Yao Road Source Code", 5L));
    }

    @Getter
    @Setter
    public static class DelayTask {<!-- -->

        private final String taskContent;

        private final Long triggerTime;

        public DelayTask(String taskContent, Long delayTime) {<!-- -->
            this.taskContent = taskContent;
            this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
        }
    }

}

The task can be stored in the database or in the memory, depending on the specific requirements, here I put it in the memory for simplicity.

Results of the:

This kind of operation is simple, but it is inefficient, and all tasks have to be traversed every time.

7. Finally

Finally, all sample code addresses in this article:

https://github.com/sanyou3/delay-task-demo.git