Implementing delay queue decoupling business based on redisson

Foreword

What I share with you today is a delay queue implemented based on redisson. There is a first version of the encapsulation tool. Users only need to care about the processing of the data obtained when the delay time is up (or process it before, and only do the remaining business when the time is up). , without further ado, let’s go straight to the goods.

1. Business scenario

Here is a data simulation report for IoT devices. Let’s take a look at the requirements interface after the prototype is converted.

2. Implementation ideas

1. There are actually many solutions for implementation:

  1. Use timer to implement
  2. Implemented using the queue provided by java
  3. redis implementation
  4. redission implementation

The simplest method can be done directly using timer. I thought that this delay queue will be used in other scenarios in the future, so that other development partners can only care about the business, so I implemented it based on redisson and encapsulated the delay queue tool class.

2. Business flow chart

A simple flow chart I drew myself:

3. Core code

1.redisson introduction and configuration

I have written about this before, so I won’t repeat it here.

2. Delay queue tool

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * Delay queue implemented by redisson
 *
 *
 * @author zwmac
 */
@Slf4j
@Component
public class RedissonDelayQueue {<!-- -->
    @Autowired
    private RedissonClient redissonClient;

    /**
     * Add tasks to the delay queue
     *
     * @param queueName Queue name
     * @param data data
     * @param delayTime delay time, unit seconds
     */
    public void addTaskToDelayQueue(String queueName,JSONObject data,Long delayTime) {<!-- -->
        if(StringUtils.isNotBlank(queueName)){<!-- -->
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(data, delayTime, TimeUnit.SECONDS);

        }
    }

    /**
     * Delete delay queue
     * @param queueName Queue name
     */
    public void delDelayQueue(String queueName) {<!-- -->
        if(StringUtils.isNotBlank(queueName)){<!-- -->
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

            blockingDeque.clear();
            blockingDeque.delete();
            delayedQueue.clear();
            delayedQueue.destroy();

        }
    }

    /**
     * Determine whether the queue exists
     * @param queueName Queue name
     * @return true exists, false does not exist
     */
    public boolean hasQueue(String queueName) {<!-- -->
        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        if (blockingDeque.isExists() & amp; & amp; delayedQueue.isExists() & amp; & amp; !delayedQueue.isEmpty()){<!-- -->
            return true;
        }
        return false;
    }

    /**
     * Queue consumer
     * @param consumer consumer
     * @param queueName Queue name
     */
    public void queueConsumer( Consumer consumer, String queueName){<!-- -->
        new Thread(() -> {<!-- -->
            while (true){<!-- -->
                try {<!-- -->
                    JSONObject data = this.takeFromDelayQueue(queueName);
                    if (data != null){<!-- -->
                        //Consumption interface
                        consumer.accept(data);
                        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
                        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
                        if (blockingDeque.isEmpty() & amp; & amp; delayedQueue.isEmpty()){<!-- -->
                            //All data has been rotated, delete the queue
                            this.delDelayQueue(queueName);
                            //End thread
                            log.info("Queue name: {}, consumption of delayed elements is completed, and the release thread exits",queueName);
                            break;
                        }
                    }
                } catch (Exception e) {<!-- -->
                    //e.printStackTrace();
                    //Exit and release the thread
                    log.info("Queue name: {}, exiting thread release, reason: {}",queueName,e.getMessage());
                    break;
                }

            }

        },queueName + "-Customer").start();
    }

    /**
     * Get data from the delay queue
     * @param queueName Queue name
     * @return queue element json object
     * @throws Exception exception
     */
    public JSONObject takeFromDelayQueue(String queueName) throws Exception {<!-- -->
        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        JSONObject jsonObject = null;
        try {<!-- -->
            //log.info("--Queue name: {}, blockingDeque number: {}, delayedQueue number: {}", queueName, blockingDeque.size(), delayedQueue.size());
            if (blockingDeque.isExists()){<!-- -->
                log.info("--Before dequeuing--queue name: {}, current queue size: {}",queueName,blockingDeque.size());
                jsonObject = blockingDeque.take();
                log.info("--after dequeueing--queue name: {}, current queue size: {}",queueName,blockingDeque.size());
            }
            /** The processing here is too early and it is destroyed before it is consumed, which will cause the consumption data to be one off.
             if (blockingDeque.isEmpty() & amp; & amp; delayedQueue.isEmpty()){
                //All data has been rotated, delete the queue
                this.delDelayQueue(queueName);
                //End thread
                //Thread.currentThread().interrupt();
                throw new RuntimeException("All data has been rotated, delete the queue");
            }**/
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }
        return jsonObject;
    }
}

</code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack.png" alt ="" title="">

There is a comment about thread destruction. If you are interested, you can see why the destruction is not handled there. Of course, I also wrote the reason in the comment.

2.Use

@Resource
  private RedissonDelayQueue redissonDelayQueue;

private MessageInfo historyReceive(JSONObject jsonObject, String identify) {<!-- -->
    //Query the historical data of the device from ES
    List<JSONObject> historyData = searchHisFromEs(nbDeviceId, startTime, endTime,logMarkId);

    //Query whether the device has a replay queue executing
    String hisRetryQueueKey = "hisRetryQueueKey-" + nbDeviceId;
    if(redissonDelayQueue.hasQueue(hisRetryQueueKey)){<!-- -->
      //There is a replay queue executing, delete the original queue
      redissonDelayQueue.delDelayQueue(hisRetryQueueKey);
    }

    //Put it in the delay queue
    if (CollectionUtil.isNotEmpty(historyData)) {<!-- -->
      queueConsumer(redissonDelayQueue,nbDeviceId,logMarkId,identify,hisRetryQueueKey);
      for (int i = 0; i < historyData.size(); i + + ) {<!-- -->
        JSONObject data = historyData.get(i);
        Long interval = 2L;
        if (i > 0){<!-- -->
          interval = Long.valueOf(intervalTime * i) + interval;
        }
        redissonDelayQueue.addTaskToDelayQueue(hisRetryQueueKey,data,interval);
      }
    }

    return new MessageInfo(0, "success");
  }
/** Delayed data business processing
**/
private void queueConsumer(RedissonDelayQueue redissonDelayQueue, String nbDeviceId, String logMarkId, String identify, String hisRetryQueueKey) {<!-- -->
    //Consume delayed queue data
    redissonDelayQueue.queueConsumer(data -> {<!-- -->
      //After replaying the data and reorganizing the data, it is directly placed in the queue where the parsing is completed.
      log.info("Time: {}---Replay data: {}", DateUtil.now(),data);
      //Business processing
      

    },hisRetryQueueKey);
  }
</code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack.png" alt ="" title="">

Here I am doing some business operations after getting elements from the delay queue. If there are no downstream cascade operations, I can actually do them in the for loop that is put into the queue. When the time is really up, I can do some simple business again. Can.
It can be seen that now you only need to process the for loop and put it into the delay queue, and the queueConsumer consumes and processes the delayed expired business.

3. Effect

Summary

  1. Decoupling, allowing development to only focus on business
  2. Based on redisson, we don’t need to pay too much attention to the underlying implementation of redis. It can be understood that there are two queues, an unexpired queue and an expired queue. As time goes by, redisson helps us move data from unexpired to expired. We only use Manage the operation of retrieving data from expiration
  3. The packaging is still rough and there is still room for improvement.
    Just share it here, hope it can help everyone, uping!