Using a database to implement a local blocking queue

I learned the technology when I was doing development at Huawei. I was doing content review in the content management team and found that they used local queues. At that time, I asked the project manager of the company why they didn’t use mq. His answer was “One application can only use 5 Queues, we have a lot of application queues, and we can’t apply for them at all, so we can only use local queues.”

design interface

import com.hncu.ex.dbqueue.base.domain.QueueTask;

/**
 * Task queue definition
 */
public interface TaskQueue {

    /**
     * forward news
     *
     * @param queueTask task
     * @return success
     */
    boolean push(QueueTask queueTask);

    /**
     * Process messages
     *
     * @param queueTask task
     */
    void queueHandler(QueueTask queueTask);

}

abstract method 1

import cn.hutool.extra.spring.SpringUtil;
import com.hncu.ex.common.base.util.IpUtils;
import com.hncu.ex.dbqueue.base.TaskQueue;
import com.hncu.ex.dbqueue.base.entity.Q3QueueConfig;
import com.hncu.ex.dbqueue.base.service.IQ3QueueConfigService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;

/**
 * Abstract task queue for framework design
 */
@Data
@Slf4j
@DependsOn({"springUtil", "queueConfigService"}) // It does not take effect as a parent class, it must be used together with the @Component annotation to take effect
public abstract class AbstractTaskQueue implements TaskQueue {
    private String taskName;// task name
    private String localIp;// current running server address
    private int taskLimit;// read quantity each time
    private boolean isStart;//whether to start the thread
    private IQ3QueueConfigService q3QueueConfigService = SpringUtil.getBean("queueConfigService");

    public AbstractTaskQueue() {
        init();
    }

    // Initialize queue configuration information
    protected void init() {
        // get task name
        this.taskName = getTaskQueueName();
        this.localIp = IpUtils.getLocalIp();
        this.taskLimit = 3;// default 3
        this.isStart = true;// Whether to start the thread
        // Load task configuration
        Q3QueueConfig q3QueueConfig = q3QueueConfigService.queryByTaskName(taskName);
        Integer taskLimit = q3QueueConfig.getTaskLimit();
        if (taskLimit != null) {
            this.taskLimit = taskLimit;
        }
        String isIniUse = q3QueueConfig.getIsIniUse();
        if (StringUtils.isEmpty(isIniUse) || "1".equals(isIniUse)) {
            this.isStart = true;
        } else {
            this.isStart = false;
        }
        log.info("Queue [{}], the configuration is initialized", taskName);
    }

    /**
     * Get the task queue name
     *
     * @return queue name
     */
    protected abstract String getTaskQueueName();

}

abstract method 2

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.hncu.ex.common.base.id.IdGenerator;
import com.hncu.ex.dbqueue.base.domain.QueueTask;
import com.hncu.ex.dbqueue.base.entity.Q3QueueTask;
import com.hncu.ex.dbqueue.base.service.IQ3QueueTaskHistoryService;
import com.hncu.ex.dbqueue.base.service.IQ3QueueTaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Asynchronous blocking queue implementation
 */
@Slf4j
@DependsOn({"springUtil", "queueTaskService", "queueTaskHistoryService"}) // It is not valid as a parent class and must be used with @Component annotation to take effect
public abstract class AsyncAbstractTaskQueue extends AbstractTaskQueue implements Runnable {
    public static final int QUEUE_SIZE = 128;// asynchronous queue size

    private BlockingQueue<QueueTask> blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
    private ReentrantLock mainLock = new ReentrantLock(true);//Using a fair lock, whoever gets it first can access it first; here is an ordinary object
    private Thread asyncThread;// asynchronous thread
    private IdGenerator idGenerator = SpringUtil. getBean("snowFlakeService");
    private IQ3QueueTaskService queueTaskService = SpringUtil.getBean("queueTaskService");
    private IQ3QueueTaskHistoryService queueTaskHistoryService = SpringUtil.getBean("queueTaskHistoryService");

    public AsyncAbstractTaskQueue() {
        ThreadGroup group = new ThreadGroup("edu_group");
        String threadName = "lq_thread_" + getTaskName();
        this.asyncThread = new Thread(group, this, threadName);
        // start directly
        this.asyncThread.start();
        log.info("Queue [{}], thread started successfully", getTaskName());
    }

    @Override
    public boolean push(QueueTask queueTask) {
        if (StringUtils. isEmpty(queueTask. getObjectId())) {
            return false;
        }
        // save a task
        String taskId = idGenerator. nextId();
        String serverIp = getLocalIp();
        String curUser = "sys_designer";
        String curDate = DateUtil.format(new Date(), DatePattern.PURE_DATETIME_MS_FORMAT);
        Q3QueueTask q3QueueTask = new Q3QueueTask();
        q3QueueTask.setTaskId(taskId);
        q3QueueTask.setTaskName(queueTask.getTaskName());
        q3QueueTask.setObjectId(queueTask.getObjectId());
        q3QueueTask.setServerIp(serverIp);
        q3QueueTask.setCreateUser(curUser);
        q3QueueTask.setCreateDate(curDate);
        queueTaskService. save(q3QueueTask);

        // Activate asynchronous task processing
        return activeTask();
    }

    // Activate asynchronous task processing
    public boolean activeTask() {
        // thread wake up
        boolean alive = this.asyncThread.isAlive();
        // WAITING true
        Thread.State state = this.asyncThread.getState();
        if (alive) {
            log.info("Queue [{}], task added successfully, check status ({}), thread wakes up", getTaskName(), state);
            // Must be locked, otherwise java.lang.IllegalMonitorStateException will be reported
            synchronized(mainLock) {
                mainLock.notify();// wake up a thread
            }
            return true;
        } else {
            log.info("queue [{}], failed to add task, check status ({}), thread shutdown", getTaskName(), state);
            // interrupt thread
            this.asyncThread.interrupt();
            return false;
        }
    }

    @Override
    public void run() {
        log.info("Queue [{}], the thread run method runs successfully", getTaskName());
        QueueTask queueTask = new QueueTask();
        queueTask.setTaskName(getTaskName());
        while (this. isStart()) {
            // queue processing
            queueHandler(queueTask);
        }
    }

    /**
     * asynchronous call
     *
     * @param queueTask task
     */
    @Override
    public void queueHandler(QueueTask queueTask) {
        try {
            // 1. Pre-call processing
            prefixHandler();

            List<String> taskIds = new ArrayList<>(QUEUE_SIZE);
            // 2. Client processing
            while (!blockingQueue. isEmpty()) {
                QueueTask oneTask = blockingQueue. poll();
                // asynchronous task handler
                asyncHandler(oneTask);
                // Add tasks that need to be deleted
                taskIds.add(oneTask.getTaskId());
            }

            // 3. Post-call processing
            suffixHandler(taskIds);
        } catch (InterruptedException e) {
            log.info("queue blocking failed", e);
        } finally {
        }
    }

    /**
     * Asynchronous task handler
     *
     * @param queueTask task
     */
    protected abstract void asyncHandler(QueueTask queueTask);

    // pre-call processing
    private void prefixHandler() throws InterruptedException {
        log.info("Queue [{}], read content", getTaskName());
        Thread.State state = this.asyncThread.getState();
        // Sort and query 3 items of data in the queue task table in order
        List<Q3QueueTask> q3QueueTasks = queueTaskService.queryByPageSize(getTaskLimit());
        if (CollectionUtil. isEmpty(q3QueueTasks)) {
            log.info("Queue [{}], the read content is empty, the thread enters the blocked state ({})", getTaskName(), state);
            // Must be locked, otherwise java.lang.IllegalMonitorStateException will be reported
            synchronized(mainLock) {
                mainLock.wait();//block a thread
                log.info("Queue[{}], the read content is empty, thread blocking state->>>running state", getTaskName());
            }
        } else {
            log.info("Queue [{}], the read content is not empty, [{}] tasks were successfully read", getTaskName(), q3QueueTasks.size());
            for (Q3QueueTask q3QueueTask : q3QueueTasks) {
                QueueTask queueTask = new QueueTask();
                queueTask.setTaskId(q3QueueTask.getTaskId());
                queueTask.setTaskName(q3QueueTask.getTaskName());
                queueTask.setObjectId(q3QueueTask.getObjectId());
                blockingQueue.offer(queueTask);//Enter the blocking queue
            }
        }
    }

    // post-call processing
    private void suffixHandler(List<String> taskIds) {
        if (CollectionUtil. isEmpty(taskIds)) {
            return;
        }
        // 1. Copy the data to the history table
        boolean flag = queueTaskHistoryService. copySave(taskIds);
        // 2. Delete the task in the task table
        if (flag) {
            boolean cnt = queueTaskService. removeByIds(taskIds);
            log.info("Queue [{}], the task in the task table was successfully deleted, [{}] tasks were successfully deleted", getTaskName(), cnt);
        } else {
            log.info("queue [{}], failed to delete tasks in the task table, [{}] tasks were successfully deleted", getTaskName(), taskIds.size());
        }
    }


}

Case 1

import com.hncu.ex.dbqueue.base.biz.AsyncAbstractTaskQueue;
import com.hncu.ex.dbqueue.base.domain.QueueTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.DependsOn;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Asynchronous queue: simple case implementation
 */
@Slf4j
@DependsOn({"springUtil", "queueConfigService"}) // It does not take effect as a parent class, it must be used together with the @Component annotation to take effect
public class SimpleTaskQueue extends AsyncAbstractTaskQueue {
    public static final String TASK_QUEUE_NAME = "simpleTaskQueue";
    protected Random random = new Random();

    /**
     * Get the task queue name
     * TODO needs to be developed to override this method
     *
     * @return queue name
     */
    @Override
    protected String getTaskQueueName() {
        return TASK_QUEUE_NAME;
    }

    /**
     * Push tasks (subclasses can directly access)
     *
     * @param objectId task object
     * @return success
     */
    public boolean pushTask(String objectId) {
        QueueTask queueTask = new QueueTask();
        queueTask.setTaskName(getTaskName());
        queueTask.setObjectId(objectId);
        // can write business logic
        return this.push(queueTask);
    }

    /**
     * Asynchronous task handler
     * TODO needs to be developed to override this method
     *
     * @param queueTask task
     */
    @Override
    protected void asyncHandler(QueueTask queueTask) {
        String objectId = queueTask. getObjectId();
        log.info("query business data-{}", objectId);
        // random sleep
        try {
            int sleepTime = random.nextInt(15);// 15 seconds
            TimeUnit. SECONDS. sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

case two

import com.hncu.ex.dbqueue.base.example.SimpleTaskQueue;
import com.hncu.ex.dbqueue.base.domain.QueueTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * Audit queue
 */
@Slf4j
@Component
@DependsOn({"springUtil", "queueConfigService"})
public class AuditTaskQueue extends SimpleTaskQueue {
    public static final String QUEUE_NAME = "auditTaskQueue";

    @Override
    protected String getTaskQueueName() {
        return QUEUE_NAME;
    }

    @Override
    protected void asyncHandler(QueueTask queueTask) {
        String objectId = queueTask. getObjectId();
        log.info("Call the third-party content review service interface-{}", objectId);
        // random sleep
        try {
            int sleepTime = this.random.nextInt(20);// 20
            TimeUnit. SECONDS. sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

The audit service accepts requests, local peak elimination, and asynchronous processing (this is how I called it when I was developing at Huawei)

mysql script

create database ex_async;

-- Task queue configuration table
drop table if exists q3_queue_config;
create table q3_queue_config (
task_name varchar(80) not null COMMENT 'task name',
task_desc varchar(200) not null COMMENT 'task description',
task_limit tinyint COMMENT 'Number of tasks to read',
is_ini_use varchar(32) COMMENT 'whether the task is enabled',
sort_no varchar(32) COMMENT 'sort number',
create_user varchar(40) COMMENT 'creator',
create_date varchar(20) COMMENT 'Creation time',
update_user varchar(40) COMMENT 'modifier',
update_date varchar(20) COMMENT 'modification time',
primary key (task_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- Queue task list
drop table if exists q3_queue_task;
create table q3_queue_task (
task_id varchar(32) not null COMMENT 'task primary key',
task_name varchar(80) COMMENT 'task name',
task_content varchar(200) COMMENT 'task content',
object_id varchar(32) COMMENT 'task object',
server_ip varchar(32) COMMENT 'task processing server',
create_user varchar(40) COMMENT 'creator',
create_date varchar(20) COMMENT 'Creation time',
primary key (task_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- Queue task history table
drop table if exists q3_queue_task_history;
create table q3_queue_task_history (
task_id varchar(32) not null COMMENT 'task primary key',
task_name varchar(80) COMMENT 'task name',
task_content varchar(200) COMMENT 'task content',
object_id varchar(32) COMMENT 'task object',
server_ip varchar(32) COMMENT 'task processing server',
create_user varchar(40) COMMENT 'creator',
create_date varchar(20) COMMENT 'Creation time',
history_user varchar(40) COMMENT 'history creator',
history_date varchar(20) COMMENT 'history creation time',
primary key (task_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

use ex_async;

delete from `q3_queue_config` where 1=1;
INSERT INTO `q3_queue_config` (`task_name`, `task_desc`, `task_limit`, `is_ini_use`, `sort_no`, `create_user`, `create_date`, `update_user`, `update_date`)
VALUES ('simpleTaskQueue', 'simple queue implementation', '3', '1', '1', 'sys_designer', '20230325', NULL, NULL);
INSERT INTO `q3_queue_config` (`task_name`, `task_desc`, `task_limit`, `is_ini_use`, `sort_no`, `create_user`, `create_date`, `update_user`, `update_date`)
VALUES ('auditTaskQueue', 'Audit Queue', '3', '1', '1', 'sys_designer', '20230325', NULL, NULL);

as shown in the picture

project structure

Warehouse Address

https://gitee.com/jadenJunLanLiu/study3.git