Delay queue means that after a message is sent, it is not executed immediately, but waits for a specific time before the consumer executes the message. The usage scenarios of delay queue include the following:
-
Orders not paid on time will be canceled after 30 minutes expires.
-
Push messages to users with low activity after N days to increase activity.
-
For new registered members, a welcome email will be sent after a few minutes.
1. How to implement delay queue?
There are two ways to implement delay queues:
-
After the message expires, it enters the dead letter exchange and is then forwarded by the exchange to the delayed consumption queue to implement the delay function;
-
Use the officially provided delay plug-in to implement the delay function.
In the early days, most companies would adopt the first method, but with the release of the delay plug-in in RabbitMQ 3.5.7 (released at the end of 2015), because it is simpler and more convenient to use, it is now generally adopted by everyone. , a way to implement a delay queue, so this article only talks about the second way.
2. Implement delay queue
2.1 Install and start the delay queue
2.1.1 Download delay plug-in
github.com/rabbitmq/ra…[1]
Note: You need to select the same version of the delay plug-in according to your own RabbitMQ server version, which can be viewed in the RabbitMQ console:
2.1.2 Place the plug-in in the plug-in directory
Next, put the plug-in downloaded in the previous step into the RabbitMQ server installation directory. If it is docker, use the following command to copy:
docker cp host file container name or ID: container directory
As shown below:
After that, enter the docker container and check whether the delay queue is included in the plugin:
docker exec -it container name or ID /bin/bash rabbitmq-plugins list
As shown below:
2.1.3 Start plug-in
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
As shown below:
2.1.4 Restart the RabbitMQ service
After installing the RabbitMQ plug-in, you need to restart the RabbitMQ service for it to take effect. If you are using Docker, you only need to restart the Docker container:
docker restart container name or ID
As shown below:
2.1.5 Acceptance results
Check in the RabbitMQ console to see if there is a delayed message option when creating a new switch. If so, it means that the delayed message plug-in is running normally, as shown in the following figure:
2.1.6 Manually create a delay switch (optional)
This step is optional (not necessary), because in some versions, creating a delay switch through the program may cause an error. If an error occurs, just create the delay queue manually, as shown in the following figure:
2.2 Write delayed message implementation code
2.2.1 Configuring switches and queues
import org.springframework.context.annotation.Configuration; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; * Delay switches and queues */ @Configuration public class DelayedExchangeConfig { public static final String EXCHANGE_NAME = "myDelayedExchange"; public static final String QUEUE_NAME = "delayed.queue"; public static final String ROUTING_KEY = "delayed.routing.key"; @Bean public CustomExchange delayedExchange() { return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false); } @Bean public Queue delayedQueue() { return QueueBuilder.durable(QUEUE_NAME) .withArgument("x-delayed-type", "direct") .build(); } @Bean public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); } }
2.1.2 Define message sending method
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class DelayedMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 5000) public void sendDelayedMessage(String message) { rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME, DelayedExchangeConfig.ROUTING_KEY, message, messagePostProcessor -> { messagePostProcessor.getMessageProperties().setDelay(10000); return messagePostProcessor; }); } }
2.1.3 Sending delayed messages
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/delayed") public class DelayedMessageController { @Autowired private DelayedMessageProducer delayedMessageProducer; @GetMapping("/send") public String sendDirectMessage(@RequestParam String message) { delayedMessageProducer.sendDelayedMessage(message); return "Delayed message sent to Exchange: " + message; } }
2.1.4 Receiving delayed messages
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DelayedMessageConsumer { @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME) public void receiveDelayedMessage(String message) { System.out.println("Received delayed message: " + message); } }
PS: To get the implementation demo of delay queue in this article, please add me: GG_Stone [Note: Delay Queue]
Summary
The current mainstream implementation of RabbitMQ delay queue is to use the officially provided delay plug-in. The delay plug-in requires downloading the plug-in first, then configuring and restarting the RabbitMQ service, and then you can implement the delay queue by writing code.
This article has been included in my interview site www.javacn.site[2], which includes: Redis, JVM, concurrency, concurrency, MySQL, Spring, Spring MVC, Spring Boot, Spring Cloud, MyBatis, design patterns, Message queue and other modules.
Reference materials
[1]
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases: https://link.juejin.cn/?target=https://github.com/rabbitmq/rabbitmq-delayed- message-exchange/releases
[2]
https://www.javacn.site: https://link.juejin.cn/?target=https://www.javacn.site