I learned the technology when I was doing development at Huawei. I was doing content review in the content management team and found that they used local queues. At that time, I asked the project manager of the company why they didn’t use mq. His answer was “One application can only use 5 Queues, we have a lot of application queues, and we can’t apply for them at all, so we can only use local queues.”
design interface
import com.hncu.ex.dbqueue.base.domain.QueueTask; /** * Task queue definition */ public interface TaskQueue { /** * forward news * * @param queueTask task * @return success */ boolean push(QueueTask queueTask); /** * Process messages * * @param queueTask task */ void queueHandler(QueueTask queueTask); }
abstract method 1
import cn.hutool.extra.spring.SpringUtil; import com.hncu.ex.common.base.util.IpUtils; import com.hncu.ex.dbqueue.base.TaskQueue; import com.hncu.ex.dbqueue.base.entity.Q3QueueConfig; import com.hncu.ex.dbqueue.base.service.IQ3QueueConfigService; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.DependsOn; /** * Abstract task queue for framework design */ @Data @Slf4j @DependsOn({"springUtil", "queueConfigService"}) // It does not take effect as a parent class, it must be used together with the @Component annotation to take effect public abstract class AbstractTaskQueue implements TaskQueue { private String taskName;// task name private String localIp;// current running server address private int taskLimit;// read quantity each time private boolean isStart;//whether to start the thread private IQ3QueueConfigService q3QueueConfigService = SpringUtil.getBean("queueConfigService"); public AbstractTaskQueue() { init(); } // Initialize queue configuration information protected void init() { // get task name this.taskName = getTaskQueueName(); this.localIp = IpUtils.getLocalIp(); this.taskLimit = 3;// default 3 this.isStart = true;// Whether to start the thread // Load task configuration Q3QueueConfig q3QueueConfig = q3QueueConfigService.queryByTaskName(taskName); Integer taskLimit = q3QueueConfig.getTaskLimit(); if (taskLimit != null) { this.taskLimit = taskLimit; } String isIniUse = q3QueueConfig.getIsIniUse(); if (StringUtils.isEmpty(isIniUse) || "1".equals(isIniUse)) { this.isStart = true; } else { this.isStart = false; } log.info("Queue [{}], the configuration is initialized", taskName); } /** * Get the task queue name * * @return queue name */ protected abstract String getTaskQueueName(); }
abstract method 2
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; import cn.hutool.extra.spring.SpringUtil; import com.hncu.ex.common.base.id.IdGenerator; import com.hncu.ex.dbqueue.base.domain.QueueTask; import com.hncu.ex.dbqueue.base.entity.Q3QueueTask; import com.hncu.ex.dbqueue.base.service.IQ3QueueTaskHistoryService; import com.hncu.ex.dbqueue.base.service.IQ3QueueTaskService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.DependsOn; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantLock; /** * Asynchronous blocking queue implementation */ @Slf4j @DependsOn({"springUtil", "queueTaskService", "queueTaskHistoryService"}) // It is not valid as a parent class and must be used with @Component annotation to take effect public abstract class AsyncAbstractTaskQueue extends AbstractTaskQueue implements Runnable { public static final int QUEUE_SIZE = 128;// asynchronous queue size private BlockingQueue<QueueTask> blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private ReentrantLock mainLock = new ReentrantLock(true);//Using a fair lock, whoever gets it first can access it first; here is an ordinary object private Thread asyncThread;// asynchronous thread private IdGenerator idGenerator = SpringUtil. getBean("snowFlakeService"); private IQ3QueueTaskService queueTaskService = SpringUtil.getBean("queueTaskService"); private IQ3QueueTaskHistoryService queueTaskHistoryService = SpringUtil.getBean("queueTaskHistoryService"); public AsyncAbstractTaskQueue() { ThreadGroup group = new ThreadGroup("edu_group"); String threadName = "lq_thread_" + getTaskName(); this.asyncThread = new Thread(group, this, threadName); // start directly this.asyncThread.start(); log.info("Queue [{}], thread started successfully", getTaskName()); } @Override public boolean push(QueueTask queueTask) { if (StringUtils. isEmpty(queueTask. getObjectId())) { return false; } // save a task String taskId = idGenerator. nextId(); String serverIp = getLocalIp(); String curUser = "sys_designer"; String curDate = DateUtil.format(new Date(), DatePattern.PURE_DATETIME_MS_FORMAT); Q3QueueTask q3QueueTask = new Q3QueueTask(); q3QueueTask.setTaskId(taskId); q3QueueTask.setTaskName(queueTask.getTaskName()); q3QueueTask.setObjectId(queueTask.getObjectId()); q3QueueTask.setServerIp(serverIp); q3QueueTask.setCreateUser(curUser); q3QueueTask.setCreateDate(curDate); queueTaskService. save(q3QueueTask); // Activate asynchronous task processing return activeTask(); } // Activate asynchronous task processing public boolean activeTask() { // thread wake up boolean alive = this.asyncThread.isAlive(); // WAITING true Thread.State state = this.asyncThread.getState(); if (alive) { log.info("Queue [{}], task added successfully, check status ({}), thread wakes up", getTaskName(), state); // Must be locked, otherwise java.lang.IllegalMonitorStateException will be reported synchronized(mainLock) { mainLock.notify();// wake up a thread } return true; } else { log.info("queue [{}], failed to add task, check status ({}), thread shutdown", getTaskName(), state); // interrupt thread this.asyncThread.interrupt(); return false; } } @Override public void run() { log.info("Queue [{}], the thread run method runs successfully", getTaskName()); QueueTask queueTask = new QueueTask(); queueTask.setTaskName(getTaskName()); while (this. isStart()) { // queue processing queueHandler(queueTask); } } /** * asynchronous call * * @param queueTask task */ @Override public void queueHandler(QueueTask queueTask) { try { // 1. Pre-call processing prefixHandler(); List<String> taskIds = new ArrayList<>(QUEUE_SIZE); // 2. Client processing while (!blockingQueue. isEmpty()) { QueueTask oneTask = blockingQueue. poll(); // asynchronous task handler asyncHandler(oneTask); // Add tasks that need to be deleted taskIds.add(oneTask.getTaskId()); } // 3. Post-call processing suffixHandler(taskIds); } catch (InterruptedException e) { log.info("queue blocking failed", e); } finally { } } /** * Asynchronous task handler * * @param queueTask task */ protected abstract void asyncHandler(QueueTask queueTask); // pre-call processing private void prefixHandler() throws InterruptedException { log.info("Queue [{}], read content", getTaskName()); Thread.State state = this.asyncThread.getState(); // Sort and query 3 items of data in the queue task table in order List<Q3QueueTask> q3QueueTasks = queueTaskService.queryByPageSize(getTaskLimit()); if (CollectionUtil. isEmpty(q3QueueTasks)) { log.info("Queue [{}], the read content is empty, the thread enters the blocked state ({})", getTaskName(), state); // Must be locked, otherwise java.lang.IllegalMonitorStateException will be reported synchronized(mainLock) { mainLock.wait();//block a thread log.info("Queue[{}], the read content is empty, thread blocking state->>>running state", getTaskName()); } } else { log.info("Queue [{}], the read content is not empty, [{}] tasks were successfully read", getTaskName(), q3QueueTasks.size()); for (Q3QueueTask q3QueueTask : q3QueueTasks) { QueueTask queueTask = new QueueTask(); queueTask.setTaskId(q3QueueTask.getTaskId()); queueTask.setTaskName(q3QueueTask.getTaskName()); queueTask.setObjectId(q3QueueTask.getObjectId()); blockingQueue.offer(queueTask);//Enter the blocking queue } } } // post-call processing private void suffixHandler(List<String> taskIds) { if (CollectionUtil. isEmpty(taskIds)) { return; } // 1. Copy the data to the history table boolean flag = queueTaskHistoryService. copySave(taskIds); // 2. Delete the task in the task table if (flag) { boolean cnt = queueTaskService. removeByIds(taskIds); log.info("Queue [{}], the task in the task table was successfully deleted, [{}] tasks were successfully deleted", getTaskName(), cnt); } else { log.info("queue [{}], failed to delete tasks in the task table, [{}] tasks were successfully deleted", getTaskName(), taskIds.size()); } } }
Case 1
import com.hncu.ex.dbqueue.base.biz.AsyncAbstractTaskQueue; import com.hncu.ex.dbqueue.base.domain.QueueTask; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.DependsOn; import java.util.Random; import java.util.concurrent.TimeUnit; /** * Asynchronous queue: simple case implementation */ @Slf4j @DependsOn({"springUtil", "queueConfigService"}) // It does not take effect as a parent class, it must be used together with the @Component annotation to take effect public class SimpleTaskQueue extends AsyncAbstractTaskQueue { public static final String TASK_QUEUE_NAME = "simpleTaskQueue"; protected Random random = new Random(); /** * Get the task queue name * TODO needs to be developed to override this method * * @return queue name */ @Override protected String getTaskQueueName() { return TASK_QUEUE_NAME; } /** * Push tasks (subclasses can directly access) * * @param objectId task object * @return success */ public boolean pushTask(String objectId) { QueueTask queueTask = new QueueTask(); queueTask.setTaskName(getTaskName()); queueTask.setObjectId(objectId); // can write business logic return this.push(queueTask); } /** * Asynchronous task handler * TODO needs to be developed to override this method * * @param queueTask task */ @Override protected void asyncHandler(QueueTask queueTask) { String objectId = queueTask. getObjectId(); log.info("query business data-{}", objectId); // random sleep try { int sleepTime = random.nextInt(15);// 15 seconds TimeUnit. SECONDS. sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } }
case two
import com.hncu.ex.dbqueue.base.example.SimpleTaskQueue; import com.hncu.ex.dbqueue.base.domain.QueueTask; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * Audit queue */ @Slf4j @Component @DependsOn({"springUtil", "queueConfigService"}) public class AuditTaskQueue extends SimpleTaskQueue { public static final String QUEUE_NAME = "auditTaskQueue"; @Override protected String getTaskQueueName() { return QUEUE_NAME; } @Override protected void asyncHandler(QueueTask queueTask) { String objectId = queueTask. getObjectId(); log.info("Call the third-party content review service interface-{}", objectId); // random sleep try { int sleepTime = this.random.nextInt(20);// 20 TimeUnit. SECONDS. sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } }
The audit service accepts requests, local peak elimination, and asynchronous processing (this is how I called it when I was developing at Huawei)
mysql script
create database ex_async; -- Task queue configuration table drop table if exists q3_queue_config; create table q3_queue_config ( task_name varchar(80) not null COMMENT 'task name', task_desc varchar(200) not null COMMENT 'task description', task_limit tinyint COMMENT 'Number of tasks to read', is_ini_use varchar(32) COMMENT 'whether the task is enabled', sort_no varchar(32) COMMENT 'sort number', create_user varchar(40) COMMENT 'creator', create_date varchar(20) COMMENT 'Creation time', update_user varchar(40) COMMENT 'modifier', update_date varchar(20) COMMENT 'modification time', primary key (task_name) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Queue task list drop table if exists q3_queue_task; create table q3_queue_task ( task_id varchar(32) not null COMMENT 'task primary key', task_name varchar(80) COMMENT 'task name', task_content varchar(200) COMMENT 'task content', object_id varchar(32) COMMENT 'task object', server_ip varchar(32) COMMENT 'task processing server', create_user varchar(40) COMMENT 'creator', create_date varchar(20) COMMENT 'Creation time', primary key (task_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Queue task history table drop table if exists q3_queue_task_history; create table q3_queue_task_history ( task_id varchar(32) not null COMMENT 'task primary key', task_name varchar(80) COMMENT 'task name', task_content varchar(200) COMMENT 'task content', object_id varchar(32) COMMENT 'task object', server_ip varchar(32) COMMENT 'task processing server', create_user varchar(40) COMMENT 'creator', create_date varchar(20) COMMENT 'Creation time', history_user varchar(40) COMMENT 'history creator', history_date varchar(20) COMMENT 'history creation time', primary key (task_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; use ex_async; delete from `q3_queue_config` where 1=1; INSERT INTO `q3_queue_config` (`task_name`, `task_desc`, `task_limit`, `is_ini_use`, `sort_no`, `create_user`, `create_date`, `update_user`, `update_date`) VALUES ('simpleTaskQueue', 'simple queue implementation', '3', '1', '1', 'sys_designer', '20230325', NULL, NULL); INSERT INTO `q3_queue_config` (`task_name`, `task_desc`, `task_limit`, `is_ini_use`, `sort_no`, `create_user`, `create_date`, `update_user`, `update_date`) VALUES ('auditTaskQueue', 'Audit Queue', '3', '1', '1', 'sys_designer', '20230325', NULL, NULL);
as shown in the picture
project structure
Warehouse Address
https://gitee.com/jadenJunLanLiu/study3.git