How does RabbitMQ implement a delay queue?

Delay queue means that after a message is sent, it is not executed immediately, but waits for a specific time before the consumer executes the message. The usage scenarios of delay queue include the following:

  1. Orders not paid on time will be canceled after 30 minutes expires.

  2. Push messages to users with low activity after N days to increase activity.

  3. For new registered members, a welcome email will be sent after a few minutes.

1. How to implement delay queue?

There are two ways to implement delay queues:

  1. After the message expires, it enters the dead letter exchange and is then forwarded by the exchange to the delayed consumption queue to implement the delay function;

  2. Use the officially provided delay plug-in to implement the delay function.

In the early days, most companies would adopt the first method, but with the release of the delay plug-in in RabbitMQ 3.5.7 (released at the end of 2015), because it is simpler and more convenient to use, it is now generally adopted by everyone. , a way to implement a delay queue, so this article only talks about the second way.

2. Implement delay queue

2.1 Install and start the delay queue

2.1.1 Download delay plug-in

github.com/rabbitmq/ra…[1]

Note: You need to select the same version of the delay plug-in according to your own RabbitMQ server version, which can be viewed in the RabbitMQ console:

2.1.2 Place the plug-in in the plug-in directory

Next, put the plug-in downloaded in the previous step into the RabbitMQ server installation directory. If it is docker, use the following command to copy:

docker cp host file container name or ID: container directory

As shown below:

After that, enter the docker container and check whether the delay queue is included in the plugin:

docker exec -it container name or ID /bin/bash rabbitmq-plugins list

As shown below:

2.1.3 Start plug-in

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

As shown below:

2.1.4 Restart the RabbitMQ service

After installing the RabbitMQ plug-in, you need to restart the RabbitMQ service for it to take effect. If you are using Docker, you only need to restart the Docker container:

docker restart container name or ID

As shown below:

2.1.5 Acceptance results

Check in the RabbitMQ console to see if there is a delayed message option when creating a new switch. If so, it means that the delayed message plug-in is running normally, as shown in the following figure:

2.1.6 Manually create a delay switch (optional)

This step is optional (not necessary), because in some versions, creating a delay switch through the program may cause an error. If an error occurs, just create the delay queue manually, as shown in the following figure:

2.2 Write delayed message implementation code

2.2.1 Configuring switches and queues
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;


 * Delay switches and queues
 */
@Configuration
public class DelayedExchangeConfig {
    public static final String EXCHANGE_NAME = "myDelayedExchange";
    public static final String QUEUE_NAME = "delayed.queue";
    public static final String ROUTING_KEY = "delayed.routing.key";

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange(EXCHANGE_NAME,
                "x-delayed-message",
                true,
                false);
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }
}

2.1.2 Define message sending method
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 5000)
    public void sendDelayedMessage(String message) {
        rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
                DelayedExchangeConfig.ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(10000);
                    return messagePostProcessor;
                });
    }
}

2.1.3 Sending delayed messages
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
    @Autowired
    private DelayedMessageProducer delayedMessageProducer;

    @GetMapping("/send")
    public String sendDirectMessage(@RequestParam String message) {
        delayedMessageProducer.sendDelayedMessage(message);
        return "Delayed message sent to Exchange: " + message;
    }
}

2.1.4 Receiving delayed messages
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class DelayedMessageConsumer {

    @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}

PS: To get the implementation demo of delay queue in this article, please add me: GG_Stone [Note: Delay Queue]

Summary

The current mainstream implementation of RabbitMQ delay queue is to use the officially provided delay plug-in. The delay plug-in requires downloading the plug-in first, then configuring and restarting the RabbitMQ service, and then you can implement the delay queue by writing code.

This article has been included in my interview site www.javacn.site[2], which includes: Redis, JVM, concurrency, concurrency, MySQL, Spring, Spring MVC, Spring Boot, Spring Cloud, MyBatis, design patterns, Message queue and other modules.

Reference materials

[1]

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases: https://link.juejin.cn/?target=https://github.com/rabbitmq/rabbitmq-delayed- message-exchange/releases

[2]

https://www.javacn.site: https://link.juejin.cn/?target=https://www.javacn.site