RabbitMQ delayed messages: dead letter exchange

Table of Contents

1. Dead letter switch

2. Delay message plug-in

Declare delay switch

Send delayed message


Usage scenarios

In the payment business of e-commerce, for some products with limited inventory, for a better user experience, the inventory of the product is usually deducted immediately when the user places an order. For example, when purchasing tickets for movie theaters or high-speed trains, the seat resources will be locked after the order is placed, and other people cannot purchase them again.

But there is a problem. If a user does not pay after placing an order, they will always occupy inventory resources, causing other customers to be unable to trade normally, and ultimately causing damage to the merchant’s interests!

Therefore, the usual practice in e-commerce is: For orders that have not been paid for more than a certain period of time, the order should be canceled immediately and the occupied inventory should be released.

For example, if the order payment timeout is 30 minutes, we should check the order payment status 30 minutes after the user places the order. If it is found that the payment has not been made, the order should be canceled immediately and the inventory released.

Delayed message: The producer specifies a time when sending a message. The consumer will not receive the message immediately, but only after the specified time.

Delayed tasks: Set tasks to be executed after a certain period of time

1. Dead letter switch

When a message in a queue meets one of the following conditions, it can become a dead letter:

  • The consumer uses basic.reject or basic.nack to declare consumption failure, and the requeue parameter of the message is set to false
  • The message is an expired message and no one will consume it after timeout.
  • The queue message to be delivered is full and cannot be delivered.

If a message in a queue has become a dead letter, and the queue specifies an exchange through the dead-letter-exchange attribute, then the dead letter in the queue will be delivered to In this switch, this switch is called Dead Letter Exchange (Dead Letter Exchange).

The role of a dead letter switch

  1. Collect messages that were rejected due to processing failure
  2. Collect messages that were rejected because the queue was full
  3. Collect messages that have expired due to TTL (validity period)

First create a queue binding dead letter switch

Create switch binding queue

Binding queue

Creating a set of queues and switches

Binding Queue

Producer sends message

 @Test
    public void SendTTLMessage() {
        //Queue name
        String queueName = "simple.direct";
        // information
        Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setExpiration("10000")//Unit milliseconds
                .build();
        // Send a message
        rabbitTemplate.convertAndSend(queueName,"hello", message);
    }

Consumer receives message

@RabbitListener(queues = "dlx.queue")
    public void listenDlxQueueMessage(Map<String, Object> msg) throws InterruptedException {
        System.out.println("Consumer received object.queue message: [" + msg + "]");
    }

The message will be sent to the death queue after ten seconds.

2. Delayed message plug-in

Although it is possible to delay messages based on dead letter queues, it is too troublesome. Therefore the RabbitMQ community provides a delayed message plug-in to achieve the same effect

Official documentation: Scheduling Messages with RabbitMQ | RabbitMQ – Blog

Plug-in download address: GitHub – rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ Since the MQ we installed is the 3.8 version, download the 3.8.17 version here

Because we are installing based on Docker, we need to first check the data volume corresponding to the RabbitMQ plug-in directory.

docker volume inspect mq-plugins

The plug-in directory is mounted to the /var/lib/docker/volumes/mq-plugins/_data directory. We upload the plug-in to this directory.

Enable plug-in

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Declaration Delay Switch

Based on annotation:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("Delay message received from delay.queue: {}", msg);
}

Based on @Bean:

package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // Specify switch type and name
                .delayed() //Set the delay attribute to true
                .durable(true) // persistence
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

Send delayed message

When sending a message, the delay time must be set through the x-delay attribute:

@Test
void testPublisherDelayMessage() {
    // 1. Create message
    String message = "hello, delayed message";
    // 2. Send a message and use the message post-processor to add message headers
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //Add delayed message attribute
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}
syntaxbug.com © 2021 All Rights Reserved.