Implementing eventually consistent distributed transactions for placing orders and reducing inventory based on RabbitMQ

The National Day holiday really gave me a break, and the update progress was slow. Now that the status has finally been adjusted back, continue to update.

Without further ado, let me show you the overall data flow diagram.

Text Interpretation

Step 1: The user places an order and calls the order service. We directly send this message to the MQ order switch, and at the same time call the remote inventory service to lock the inventory.

Step 2: Messages sent to the order exchange are automatically forwarded to the dead letter queue. After the dead letter queue expires in 30 minutes, it will be automatically forwarded to another ordinary queue.

Step 3: At the same time as the second step, after calling the lock inventory method of the inventory service, the inventory service will also send a message to the inventory switch, and the switch will forward it to the dead letter queue. After 60 minutes, After expiration, it is forwarded to another ordinary queue.

Step 4: At this time, it is time for our ordinary queue listener to process the business. After the order ordinary queue listener listens to the message, it means that the message has arrived after 30 minutes. That means expired orders need to be deleted. So the business logic is executed to change the order status to canceled. After the business logic processing is completed, he will also send a message to the inventory switch to let the inventory service unlock the inventory just locked.

Step 5: The ordinary queue listener in the inventory service will unlock the inventory after listening to the unlock inventory message sent by the order service in step 4.

Step 6: This step can be said to be the automatic unlocking of the inventory – it can be said to be a secondary unlocking guarantee for the system. In fact, the above five steps have completed placing an order to lock the inventory and canceling the order to release the inventory. However, problems with any of the above steps may cause inventory unlocking to fail. All this step further ensures the reliability of the system. We set the inventory’s dead letter queue to 60 minutes (must be greater than the expiration time of the order) so that if there is a problem with any of the above steps, we will still scan every record of our locked inventory after 60 minutes and re-judge the issue. The recorded order status is used to determine whether we should unlock the inventory again.

The core of the entire process is RabbitMQ’s dead letter queue

The main thing used is the RabbitMQ component, which uses RabbitMQ’s Dead Letter Queue to complete the scheduled expiration function of our temporary orders, and also implements the inventory locking function and the automatic inventory unlocking function.

RabbitMQ’s dead letter queue is actually an ordinary queue with specific parameters, which becomes what we call a dead letter queue. as follows:

 @Bean
    public Queue orderDelayQueue() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "order-event-exchange");
        args.put("x-dead-letter-routing-key", "order.release.order");
        args.put("x-message-ttl", 60000);
        return new Queue("order.delay.queue",true,false,false,args);
    }

If there are still friends who are not familiar with the basic structure and basic use of RabbitMQ, you can first check my previous articles for a detailed introduction to RabbitMQ. http://t.csdnimg.cn/kdgzw

Next, use code to record the complete process

The first step, create and configure switches and queues

1. MQ configuration of order service

@Configuration
public class MyRabbitMQConfig {
    /* Queue, Exchange, and Binding in the container will be automatically created (when RabbitMQ) does not exist */

    /**
     * Dead letter queue
     * @return
     */
    @Bean
    public Queue orderDelayQueue() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "order-event-exchange");
        args.put("x-dead-letter-routing-key", "order.release.order");
        args.put("x-message-ttl", 1800000);
        return new Queue("order.delay.queue",true,false,false,args);
    }

    /**
     * Ordinary queue
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue("order.release.order.queue",true,false,false);
    }

    /**
     * topic exchange
     * @return
     */
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange("order-event-exchange", true, false);
    }

    /**
     * Dead letter binding relationship 1
     * @return
     */
    @Bean
    public Binding orderCreateBinding() {
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
    }

    /**
     * Normal queue binding relationship 2
     * @return
     */
    @Bean
    public Binding orderReleaseBinding() {
        return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
    }

    /**
     * Order release is directly bound to inventory release
     * @return
     */
    @Bean
    public Binding orderReleaseOtherBinding() {

        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null);
    }


}

2. MQ configuration of inventory service

@Configuration
public class MyRabbitMQConfig {
    

    @Bean
    public Exchange stockEventExchange() {
        return new TopicExchange("stock-event-exchange", true, false);
    }

    /**
     * Ordinary queue
     * @return
     */
    @Bean
    public Queue stockReleaseStockQueue() {
        return new Queue("stock.release.stock.queue",true,false,false);
    }


    /**
     * Delay queue
     * @return
     */
    @Bean
    public Queue stockDelay() {

        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");
        arguments.put("x-dead-letter-routing-key", "stock.release");
        arguments.put("x-message-ttl", 3600000);

        Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);
        return queue;
    }

    /**
     * The switch is bound to a common queue
     * @return
     */
    @Bean
    public Binding stockLocked() {
        Binding binding = new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);

        return binding;
    }


    /**
     * The switch is bound to the delay queue
     * @return
     */
    @Bean
    public Binding stockLockedBinding() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }


}

The second step, ordering service

After creating the order, the remote inventory service is called to lock the inventory, and a message is sent to the MQ order switch.

// The local transaction is not satisfied
    // . To use distributed transactions
    @Override
    @Transactional(rollbackFor = Exception.class)
    public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) throws Exception {

          /**
         * Here I have deleted some of my own business logic, so this code cannot be used directly.
         * Therefore, appropriate modifications must be made. The following is the main order inventory logic. Just study the following code carefully.
         */
            

        //Save order
        saveOrder(order);

        //Inventory processing, important place, as long as there is an exception in locking the inventory location, it will be rolled back
        WareSkuLockVo lockVo = new WareSkuLockVo();
        lockVo.setOrderSn(order.getOrder().getOrderSn());

        //Item to be locked
        List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map(item -> {
            OrderItemVo orderItemVo = new OrderItemVo();
            orderItemVo.setSkuId(item.getSkuId());
            orderItemVo.setCount(item.getSkuQuantity());
            orderItemVo.setTitle(item.getSkuName());
            return orderItemVo;
        }).collect(Collectors.toList());
        lockVo.setLocks(orderItemVos);

        /**
         * Key points:::! ! ! ! ! !
         * Use RabbitMQ to achieve eventual consistency of distributed transactions and better concurrency performance
         * Use RabbitMQ to achieve eventual consistency of distributed transactions and better concurrency performance
         * Use RabbitMQ to achieve eventual consistency of distributed transactions and better concurrency performance
         */
        // todo remote call lock inventory
        try {
            R r = wmsFeignService.orderLockStock(lockVo);
            if (r.getCode() == 0) {
                // Lock successful
                responseVo.setOrder(order.getOrder());

                // todo sends a message to the order exchange and is closed regularly
                String uuid = UUID.randomUUID().toString().replace("-", "");
                order.getOrder().setMqMessageId(uuid);
                CorrelationData correlationData = new CorrelationData(uuid);


                rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder(), correlationData);


                return responseVo;
            } else {
                String msg = (String) r.get("msg");
                throw new NoStockException(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
            responseVo.setCode(3);
            throw new NoStockException("Remote inventory not responding");
        }
        return responseVo;
    }

The third step, inventory service locks inventory

Lock the inventory and send a message to MQ.

 @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean orderLockStock(WareSkuLockVo vo) {
        WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();
        wareOrderTaskEntity.setOrderSn(vo.getOrderSn());
        wareOrderTaskEntity.setCreateTime(new Date());
        wareOrderTaskService.save(wareOrderTaskEntity);

        // Find inventory
        List<OrderItemVo> locks = vo.getLocks();

        List<SkuWareHasStock> collect = locks.stream().map(item -> {
            SkuWareHasStock skuWareHasStock = new SkuWareHasStock();
            skuWareHasStock.setSkuId(item.getSkuId());
            skuWareHasStock.setNum(item.getCount());
            //Check which warehouse this product is in stock
            List<Long> wareIdList = wareSkuDao.listWareIdHasSkuStock(item.getSkuId());
            skuWareHasStock.setWareId(wareIdList);
            return skuWareHasStock;
        }).collect(Collectors.toList());

        // lock inventory
        for (SkuWareHasStock hasStock : collect) {
            boolean skuStocked = false;
            Long skuId = hasStock.getSkuId();
            List<Long> wareIds = hasStock.getWareId();

            if (CollectionUtils.isEmpty(wareIds)) {
                throw new NoStockException(skuId);
            }

            for (Long wareId:wareIds) {
                //If the lock is successful, it will return 1, if it fails, it will return 0. Locking the inventory is not a real inventory reduction.
                Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());
                if (count == 1){
                    // Locked successfully, real inventory reduction
                    skuStocked = true;
                    //

                    WareOrderTaskDetailEntity taskDetail = new WareOrderTaskDetailEntity();
                    taskDetail.setSkuId(skuId);
                    taskDetail.setSkuName("");
                    taskDetail.setTaskId(wareOrderTaskEntity.getId());
                    taskDetail.setWareId(wareId);
                    taskDetail.setLockStatus(1);
                    taskDetail.setSkuNum(hasStock.getNum());
                    wareOrderTaskDetailService.save(taskDetail);

                    //MQ sends message to lock inventory
                    StockLockedTo lockedTo = new StockLockedTo();
                    lockedTo.setId(wareOrderTaskEntity.getId());
                    StockDetailTo stockDetailTo = new StockDetailTo();
                    BeanUtils.copyProperties(taskDetail, stockDetailTo);
                    lockedTo.setDetail(stockDetailTo);
                    String uuid = UUID.randomUUID().toString().replace("-", "");
                    lockedTo.setMqMessageId(uuid);
                    CorrelationData correlationData = new CorrelationData(uuid); // Used by callback function

                    //Send a message to the inventory switch and the delay queue, saying that I want to lock the inventory.
                    rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo, correlationData);


                    break;
                }
            }
            if (!skuStocked) {
                //All warehouses of the current product are not locked
                throw new NoStockException(skuId);
            }
        }

        //3. All of them must be locked successfully.
        return true;
    }

Step 4, Order Expiration MQ Listener

When the order expires, cancel the order, and finally send a message to the MQ inventory switch to tell the inventory to unlock the inventory.

 @RabbitListener(queues = "order.release.order.queue")
    @RabbitHandler
    public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("Expired order, ready to close...");
        try {
            orderService.closeOrder(orderEntity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }catch (Exception e){
            e.printStackTrace();
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
@Override
    public void closeOrder(OrderEntity orderEntity) {

        OrderEntity orderSn = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderEntity.getOrderSn()));
        if (orderSn != null & amp; & amp; orderSn.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
            // Idempotent, only pending orders will be deleted
            OrderEntity update = new OrderEntity();
            update.setId(orderEntity.getId());
            update.setStatus(OrderStatusEnum.CANCLED.getCode());
            this.updateById(update);

            OrderTo orderTo = new OrderTo();
            BeanUtils.copyProperties(orderSn, orderTo);
            //Send a message to mq to unlock the inventory
            try {

                // todo Every place where messages are sent must be recorded in the log table to prevent message loss.
                String uuid = UUID.randomUUID().toString().replace("-", "");
                orderTo.setMqMessageId(uuid);
                CorrelationData correlationData = new CorrelationData(uuid);

                rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo, correlationData);

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

Step 5, Inventory Service Listener

There are two listening methods, one listening for messages sent by the order service, and the other listening for messages about the automatic expiration of inventory locks.

/**
 * Automatically unlock inventory
 */
@Service
@RabbitListener(queues = "stock.release.stock.queue")
@Slf4j
public class StockReleaseListener {

    @Autowired
    private WareSkuService wareSkuService;

    @Autowired
    MqMessageService mqMessageService;


    /**
     * 1. Inventory is automatically unlocked
     * The order is placed successfully and the inventory is locked successfully, but subsequent business fails, causing the order to be rolled back. At this time, the inventory will be unlocked.
     * 2. Order failed
     * Inventory lock failed
     * As long as the message to unlock the inventory fails, be sure to tell the service that the unlock failed
     */
    @RabbitHandler
    public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
        log.info("****** time is up, automatically unlocked, received information about unlocking inventory******");
        try {
            // Unlock inventory
            wareSkuService.unLockStock(to);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }catch (Exception e) {
            e.printStackTrace();
            //Unlocking failed, the message is put back into the queue
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }



    @RabbitHandler
    public void handleOrderCloseRelease(OrderTo orderTo, Channel channel, Message message) throws IOException {
        log.info("Order canceled successfully, inventory unlocking started...");
        try{
            wareSkuService.unLockStock(orderTo);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }catch (Exception e){
            e.printStackTrace();
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

}

When the inventory message time expires, the business logic of automatically unlocking the inventory

@Override
    public void unLockStock(StockLockedTo to) {
        StockDetailTo detail = to.getDetail();
        Long detailId = detail.getId();

        /**
         * Unlock
         * Only unlock locks related to your own inventory business, that is, the inventory order task table and the inventory order task details table.
         * So we need to connect the order service to check the specific status of this order.
         * 1. The order exists and the status is not canceled
         * 2. The order does not exist
         * Need both to unlock
         */

        // The first choice is to first check whether your inventory has been successfully locked and generate a record.
        WareOrderTaskDetailEntity taskDetail = wareOrderTaskDetailService.getById(detailId);
        if (taskDetail != null) {
            // Check work order information
            Long taskId = to.getId();
            WareOrderTaskEntity orderTask = wareOrderTaskService.getById(taskId);
            String orderSn = orderTask.getOrderSn();
            // todo remote viewing order service. Find order to see if the order is successful.
            R orderStatus = orderFeignService.getOrderStatus(orderSn);
            if (orderStatus.getCode() == 0) {
                // success
                OrderVo orderInfo = (OrderVo) orderStatus.getData("data", new TypeReference<OrderVo>() {
                });
                // The following two states must be unlocked
                if (orderInfo == null ){ // There is no order, indicating that the order failed
                    if (taskDetail.getLockStatus() == 1) { // The inventory order task status must be locked before it can be unlocked.
                        unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);
                    }
                }else if (orderInfo.getStatus() == 4){ // Closed
                    if (taskDetail.getLockStatus() == 1) { // The inventory order task status must be locked before it can be unlocked.
                        unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);
                    }
                }

            }else {
                //Remote failure
                throw new RuntimeException("Remote service call failed.");
            }
        }
    }

Business processing logic for order cancellation messages

 @Transactional
    @Override
    public void unLockStock(OrderTo orderTo) {
        String orderSn = orderTo.getOrderSn();
        WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));
        List<WareOrderTaskDetailEntity> taskDetailEntityList = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", orderTaskEntity.getId()).eq("lock_status",1));

        for (WareOrderTaskDetailEntity item : taskDetailEntityList) {
            unLockStock(item.getSkuId(), item.getWareId(), item.getSkuNum(), item.getId());
        }
    }

In summary, this set is implemented, based on distributed eventual consistency of RabbitMQ.

However, there are still some problems in this process. It is called between each service. During the process of sending and receiving messages to MQ, message loss may easily occur due to various reasons. This is also a very serious problem.

In the next article, I will introduce to you how to deal with the problem of RabbitMQ message loss.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 16,891 people are learning the system