Table of Contents
1. Dead letter switch
2. Delay message plug-in
Declare delay switch
Send delayed message
Usage scenarios
In the payment business of e-commerce, for some products with limited inventory, for a better user experience, the inventory of the product is usually deducted immediately when the user places an order. For example, when purchasing tickets for movie theaters or high-speed trains, the seat resources will be locked after the order is placed, and other people cannot purchase them again.
But there is a problem. If a user does not pay after placing an order, they will always occupy inventory resources, causing other customers to be unable to trade normally, and ultimately causing damage to the merchant’s interests!
Therefore, the usual practice in e-commerce is: For orders that have not been paid for more than a certain period of time, the order should be canceled immediately and the occupied inventory should be released.
For example, if the order payment timeout is 30 minutes, we should check the order payment status 30 minutes after the user places the order. If it is found that the payment has not been made, the order should be canceled immediately and the inventory released.
Delayed message: The producer specifies a time when sending a message. The consumer will not receive the message immediately, but only after the specified time.
Delayed tasks: Set tasks to be executed after a certain period of time
1. Dead letter switch
When a message in a queue meets one of the following conditions, it can become a dead letter:
- The consumer uses
basic.reject
orbasic.nack
to declare consumption failure, and therequeue
parameter of the message is set to false - The message is an expired message and no one will consume it after timeout.
- The queue message to be delivered is full and cannot be delivered.
If a message in a queue has become a dead letter, and the queue specifies an exchange through the dead-letter-exchange
attribute, then the dead letter in the queue will be delivered to In this switch, this switch is called Dead Letter Exchange (Dead Letter Exchange).
The role of a dead letter switch
- Collect messages that were rejected due to processing failure
- Collect messages that were rejected because the queue was full
- Collect messages that have expired due to TTL (validity period)
First create a queue binding dead letter switch
Create switch binding queue strong>
Binding queue
Creating a set of queues and switches
Binding Queue
Producer sends message
@Test public void SendTTLMessage() { //Queue name String queueName = "simple.direct"; // information Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)) .setExpiration("10000")//Unit milliseconds .build(); // Send a message rabbitTemplate.convertAndSend(queueName,"hello", message); }
Consumer receives message
@RabbitListener(queues = "dlx.queue") public void listenDlxQueueMessage(Map<String, Object> msg) throws InterruptedException { System.out.println("Consumer received object.queue message: [" + msg + "]"); }
The message will be sent to the death queue after ten seconds.
2. Delayed message plug-in
Although it is possible to delay messages based on dead letter queues, it is too troublesome. Therefore the RabbitMQ community provides a delayed message plug-in to achieve the same effect
Official documentation: Scheduling Messages with RabbitMQ | RabbitMQ – Blog
Plug-in download address: GitHub – rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ Since the MQ we installed is the 3.8
version, download the 3.8.17
version here
Because we are installing based on Docker, we need to first check the data volume corresponding to the RabbitMQ plug-in directory.
docker volume inspect mq-plugins
The plug-in directory is mounted to the /var/lib/docker/volumes/mq-plugins/_data
directory. We upload the plug-in to this directory.
Enable plug-in
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Declaration Delay Switch
Based on annotation:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("Delay message received from delay.queue: {}", msg); }
Based on @Bean
:
package com.itheima.consumer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class DelayExchangeConfig { @Bean public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange("delay.direct") // Specify switch type and name .delayed() //Set the delay attribute to true .durable(true) // persistence .build(); } @Bean public Queue delayedQueue(){ return new Queue("delay.queue"); } @Bean public Binding delayQueueBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay"); } }
Send delayed message
When sending a message, the delay time must be set through the x-delay attribute:
@Test void testPublisherDelayMessage() { // 1. Create message String message = "hello, delayed message"; // 2. Send a message and use the message post-processor to add message headers rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //Add delayed message attribute message.getMessageProperties().setDelay(5000); return message; } }); }