An Implementation of Asynchronous Delay Queue

1. Application scenarios

At present, there are many functions in the system that require delayed processing: payment timeout cancellation, queuing timeout, delayed sending of SMS, WeChat and other reminders, token refresh, membership card expiration, etc. Through delayed processing, system resources are greatly saved, and there is no need to poll the database for processing tasks.

At present, most of the functions are completed by timing tasks. There are two types of timing tasks: quartz and xxljob. The polling time is short and executed once per second, which puts a certain pressure on the database and has a 1-second error. The polling time is long, for example, once every 30 minutes, a piece of data is inserted at 03:01, and the expiration is normally executed at 3:31, but when the polling is performed at 3:30, the data from 3:00-3:30 is scanned, but the scan is less than 3: The data of 31 can only be scanned at 4:00, which is equivalent to a delay of 29 minutes!

2. Research on delay processing methods

1. DelayQueue

1. Implementation method:

The delay blocking queue provided by jvm sorts the tasks with different delay times through the priority queue, blocks through the condition, and acquires the delayed tasks during the sleep dealy time.

When a new task is added, it will judge whether the new task is the first task to be executed. If so, the queue sleep will be released to prevent the newly added elements that need to be executed and cannot be normally obtained by the execution thread.

2. Existing problems:

  • Standalone operation, after the system is down, effective retrying cannot be performed
  • Recording and backup not performed
  • no retry mechanism
  • When the system restarts, the task will be cleared!
  • Fragmentation cannot be consumed

3. Advantages: simple implementation, blocking when there is no task, saving resources, and accurate execution time

2. Delay queue mq

Implementation method: rely on mq, and achieve the delayed consumption function by setting the delayed consumption time. Like rabbitMq and jmq, you can set the delayed consumption time. RabbitMq is implemented by setting the expiration time of the message and putting it into the dead letter queue for consumption.

Existing problems:

1. The time setting is not flexible. Each queue has a fixed expiration time. Every time a delay queue is newly created, a new message queue needs to be created

Advantages: Relying on jmq, it can effectively monitor, consume records, and retry, has the ability to consume multiple machines at the same time, and is not afraid of downtime

3. Scheduled tasks

Polling qualified data through scheduled tasks

shortcoming:

  • It is necessary to read the business database, which puts a certain pressure on the database.
  • There is a delay
  • When the amount of data scanned at one time is too large, too many system resources are occupied.
  • Fragmentation cannot be consumed

advantage:

  • After the consumption fails, it can continue to consume next time and has the ability to retry.
  • Stable spending power

4.redis

The tasks are stored in redis, and the zset queue of redis is used to sort according to the score. The program continuously obtains the queue data consumption through the thread to realize the delay queue

advantage:

  • Querying redis is faster than the database, and the set queue length is too large, and the query will be performed according to the jump table structure, which is highly efficient
  • redis can be sorted according to the timestamp, only need to query the task of the score in the current timestamp
  • No fear of machine restart
  • distributed consumption

shortcoming:

  • Limited by redis performance, concurrent 10W
  • Multiple commands cannot guarantee atomicity. Using lua scripts will require all data to be on one redis shard.

5. Time wheel

Delayed task execution through the time wheel is also based on jvm stand-alone operation. For example, kafka and netty both implement time wheels, and redisson’s watchdog is also realized through netty’s time wheel.

Disadvantages: It is not suitable for the use of distributed services, and tasks will be lost after downtime.

3. Achieve the goal

Compatible with the asynchronous event components currently in use, and provide more reliable, retryable, recorded, alarm-monitoring, and high-performance delay components.

?Message transmission reliability: After the message enters the delay queue, it is guaranteed to be consumed at least once.

?Client supports rich: supports multiple languages.

? High availability: supports multi-instance deployment. After an instance is suspended, there is a backup instance that continues to provide services.

?Real-time: a certain time error is allowed.

?Support message deletion: business users can delete specified messages at any time.

?Support consumption query

?Support manual retry

?Add monitoring to the execution of the current asynchronous event

4. Architecture design

5. Delay component implementation

1. Implementation principle

At present, we choose to use jimdb to implement the delay function through zset, and store the task id and the corresponding execution time as score in the zset queue. By default, they will be sorted by score, and each time we take the task id of 0-score within the current time,

When sending a delayed task, a unique id will be generated based on the timestamp + machine ip + queueName + sequence, the message body will be constructed, encrypted and put into the zset queue.

By moving the thread, the task that has reached the execution time is moved to the release queue, waiting for the consumer to obtain it.

The monitoring party integrates ump

Consumption records are completed through redis backup + database persistence.

The method implemented by caching is only one kind of implementation, which implementation method can be controlled through parameters, and can be freely expanded through spi.

2. Message structure

Each Job must contain the following attributes:

?Topic: Job type, that is, QueueName

?Id: the unique identifier of the Job. Used to retrieve and delete the specified Job information.

?Delay: The time the Job needs to be delayed. Unit: second. (The server will convert it to absolute time)

?Body: The content of the Job, for consumers to do specific business processing, stored in json format.

?traceId: The traceId of the sending thread. After the subsequent pfinder supports setting the traceId, it can share the same traceid with the sending thread, which is convenient for log tracking

The specific structure is shown in the figure below:

TTR is designed to ensure the reliability of message transmission.

3. Data flow and flow chart

Publish and consume based on the redis-disruptor method, which can be used as a message. Consumers use the original asynchronous event disruptor lock-free queue for consumption, and there is no lock between different applications and different queues

1. Support the application to only publish, not consume, and achieve the function of message queue.

2: Support bucketing. For the problem of large keys, if there are many events, you can set the number of delay queues and task queue buckets to reduce the redis blocking problem caused by large keys.

3: Through the ducc configuration, the performance is expanded. Currently, only consumption is enabled and consumption is disabled.

4: Support setting timeout configuration to prevent consumer threads from executing for too long

Bottleneck: The consumption speed is slow and the production speed is too fast, which will cause the ringbuffer queue to be full. When the current application is both a producer and a consumer, the producer will sleep, and the performance depends on the consumption speed. The machine can be expanded horizontally to directly improve performance. Monitor the length of the redis queue. If it continues to grow, consider adding consumers to directly improve performance.

Possible situation: Because an application shares a disruptor and has 64 consumer threads, if the consumption of a certain event is too slow, all 64 threads are consuming this event, which will cause other events to be consumed by no consumer thread, and the producer thread will also consume is blocked, causing the consumption of all events to be blocked.

To observe whether there is such a performance bottleneck later, you can give each queue a consumer thread pool. ?

6.demo example

Add configuration file

Determine whether to enable jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
<artifactId>senna-event</artifactId>
<version>1.0-SNAPSHOT</version> </dependency>

Configuration

jd:
 senna:
  event:
   enable: true
  queue:
   retryEventQueue:
    bucketNum: 1
    handleBean: retryHandle

Consumption Code:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 PM
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler starts to consume: {}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler starts consumption: {}", key);
}
}

Comment form:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 PM
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5, delayBucketNum = 5, delayEnable = true)
public class TestQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler starts to consume: {}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler starts consumption: {}", key);
}
}

Send Code

package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;


/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {

@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;

@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("Send message without delay");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("Send a message with a delay of 5 seconds");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("Send a message delayed until 2022-04-02 00:00:00");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
}

}

Refer to Youzan Design: https://tech.youzan.com/queuing_delay/?

7. Current application:

1. Yunxiu will automatically cancel after 24 hours in line at the store

2. Meituan requests token refresh regularly.

3. Warranty card will be generated within 24 hours

5. Postponement of statement generation

6. Delayed SMS sending