An in-depth introduction to RabbitMQ: sequential consumption, dead letter queue and delay queue

1. RabbitMQ

1.1 Core Components

RabbitMQ is an open source message middleware that implements the Advanced Message Queuing Protocol (AMQP) and provides various important components to support the production, transmission and consumption of messages.

Picture

  1. Producer: The producer is the sender of messages and is responsible for publishing messages to the RabbitMQ server. Messages can contain anything, such as tasks, logs, notifications, etc.

  2. Channel: The channel used when pushing and receiving messages.

  3. Exchange: An exchange is a messaging hub that receives messages from producers and routes them to one or more queues. Different types of switches, such as fanout, direct, topic, headers, support different routing rules.

  4. Queue (queue): A queue is a buffer for messages. Messages are stored in the queue before being sent to the consumer. The consumer obtains the message from the queue and processes it.

  5. Consumer: A consumer is the receiver of messages. It gets messages from the queue and processes them. There can be multiple consumers, and they can run on different applications or servers.

1.2 Workflow

The way RabbitMQ works is based on collaboration between producers, exchanges, and queues. This is a simple messaging process:

  1. Bind the queue to the switch (Binding) and define the routing rules for messages;

  2. Producers publish messages to the exchange, and the exchange routes the messages to one or more queues based on binding rules;

  3. Consumers get messages from the queue and process them.

This model is highly flexible and can easily handle large volumes of messages while ensuring reliable delivery of messages.

1.3 Features

When it comes to message middleware, many people first think of Kafka, but RabbitMQ is also the first choice for many financial or Internet companies to build reliable, scalable and high-performance systems.

Why is this?

Let’s start with the features of RabbitMQ. There are two main features: one is powerful and the other is reliability!

RabbitMQ focuses on the reliability and flexibility of messages and is suitable for task queuing and message delivery. Kafka is a distributed streaming platform that focuses on log storage and data distribution.

Sequential consumption is also a type of reliability. RabbitMQ can use a single queue or multiple single queues to ensure sequential consumption.

In addition, RabbitMQ also provides persistent queues and messages to ensure that messages are not lost after the RabbitMQ server goes down. In addition, producers can use the publish confirmation mechanism to confirm whether messages have been received.

RabbitMQ is more reliable than kafka, and data is less likely to be lost. For some data-sensitive businesses, the former is obviously more suitable.

Moreover, RabbitMQ natively supports Dead Letter Queue, which can better handle unfinished business messages and implement features such as Delay Queue. We will introduce them one by one next.

2. Ensure sequential consumption

RabbitMQ provides multiple queue models to ensure sequential consumption of messages. This is important for certain applications, such as order processing, payments, and inventory management.

Scenes of confusing message consumption

Picture

As shown in the figure above, there are three business messages, which are delete, add and modify operations, but Consumer is not consumed in order, and the final storage order is add, modify and deletion, data corruption will occur.

For the problem of message orderliness, RabbitMQ’s solution is to ensure it in three stages.

1. Send message: put into queue

When sending messages, business is required to ensure orderliness, that is, to ensure that the order in which producers are added to the queue is in order.

In a distributed scenario, if it is difficult to ensure the order in which each server joins the queue, it can be solved by adding a distributed lock. Or include Message increment ID and the timestamp of message generation in the message from the business producer.

2. Messages in the queue

Messages in RabbitMQ will be stored in the queue (Queue). Messages in the same queue are first in, first out (FIFO). This is done by RabbitMQ to help us ensure the order.

RabbitMQ cannot guarantee the order of messages in different queues. Just like when we are serving meals in a canteen, if we stand in different queues, we cannot guarantee that we will receive meals before people in other queues.

Picture

3. Consume messages: Dequeue

Generally speaking, the order consumption after dequeuing is left to the consumer to ensure. When we talk about guaranteed consumption order, we usually refer to the order in which consumers consume messages.

When there are multiple consumers, the order of messages is usually not guaranteed.

This is equivalent to when we are queuing up to cook, there are multiple cooking aunties, but the speed of each aunt’s cooking is not consistent, which corresponds to the consumption power of our consumers.

Therefore, in order to ensure the order of messages, we can only use one consumer to receive business messages.

It’s just like if there is only one aunt preparing meals, if you come early, you will be able to get meals early. But obviously, this is not very efficient, so we need to weigh the pros and cons when using it: Depends on whether the business requires sequence or consumption efficiency.

Priority Queue

When ensuring sequential consumption, another roundabout strategy is to use priority queue (Priority Queue).

After RabbitMQ3.5, when the number of consumers is small and the server detects that the consumer cannot consume messages in time, the priority queue will take effect.

There are two specific priority strategies:

  1. Set queue priority

  2. Set message priority

When declaring a queue, we can set the maximum priority of the queue through the x-max-priority attribute, or set the priority of the message through the Priority attribute, from 1~ 10.

The Golang implementation code is as follows:

//Queue properties
props := make(map[string]interface{})
//Set the maximum priority of the queue
props["x-max-priority"] = 10

ch.Publish(
   "tizi365", // switch
   "", // Routing parameters
   false,
   false,
   amqp.Publishing{
       Priority:5, //Set message priority
       DeliveryMode:2, // Message delivery mode, 1 represents non-persistent, 2 represents persistent,
       ContentType: "text/plain",
       Body: []byte(body),
  })

When priority queue consumption takes effect, messages with higher priority in the high-priority queue will be consumed first to achieve sequential consumption.

However, it should be noted that the conditions for triggering the priority queue are relatively harsh. It is best not to use it when the order of business messages needs to be strictly guaranteed!

3. Dead letter queue

In RabbitMQ, when a message becomes a dead letter in the queue (a message that the consumer cannot process normally), it will be re-delivered to an exchange (i.e. a dead letter switch), a dead letter The consumer queue bound to the switch is the dead letter queue.

Picture

The generation of dead letters

The following conditions need to be met to generate a dead letter:

  1. The message was manually rejected by the consumer and the requeue (re-queue) policy was False;

  2. The message has expired (TTL);

  3. The queue has reached its maximum length and the message cannot be loaded.

Dead letter processing steps

When dead letters are generated, if we define a dead letter switch (actually an ordinary switch, just used to process dead letters, so it is called a dead letter switch), then bind A queue is defined (called the Dead Letter Queue).

Finally, if there is a consumer monitoring the dead letter queue, the processing of the dead letter message will be the same as normal business messages, from the switch to the queue, and then by Dead letter consumer (monitoring the consumption of the dead letter queue) (or) normal consumption.

4. Delay queue

RabbitMQ itself does not support delayed queues, but we can implement it through the RabbitMQ plug-in rabbitmq-delayed-message-exchange, or by using dead letter queue + message expiration.

4.1 Application scenarios

When we shop in e-commerce or buy tickets at 12306, we will probably encounter such a scenario: every time after placing an order, there is a period of product lock-in time between the time of payment and the order. After the time is exceeded, unpaid orders will be Close.

The state transition diagram is as follows:

Picture

4.2 Plug-in implementation

1. Install plug-in

Github Address:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

From the assets of the release page of github, download the rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez file and put the file into the rabbitmq plug-in directory (plugins directory)

Tip: The version number may be different from this tutorial. If your rabbitmq is the latest version, just select the latest version of the plug-in.

2. Activate plug-in
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3. Define switches

Set custom switch properties via x-delayed-type to support sending delayed messages:

 props := make(map[string]interface{})
   //Key parameters, support sending delayed messages
   props["x-delayed-type"] = "direct"

   //Declare the switch
   err = ch.ExchangeDeclare(
       "delay.queue", // switch name
       "fanout", // switch type
       true, // Whether to persist
       false,
       false,
       false,
       props, // Set properties
  )
4.Send delayed message

Set the message delay time through the message header (x-delay).

 msgHeaders := make(map[string]interface{})
       // Set the message delay time through the message header, in milliseconds
       msgHeaders["x-delay"] = 6000

       err = ch.Publish(
           "delay.queue", // switch name
           "", // Routing parameters
           false,
           false,
           amqp.Publishing{
               Headers:msgHeaders, // Set message headers
               ContentType: "text/plain",
               Body: []byte(body),
          })

4.3 Dead letter queue + message expiration solution

The core idea of this solution is to first create a dead-letter switch, queue, and consumer to monitor dead-letter messages.

Then create a regularly expired message. For example, if the order payment time is 30 minutes, set the TTL (maximum survival time) of the message to 30 minutes, and place the message in a queue without consumer consumption. , when the message expires, it becomes a dead letter.

The dead letter message is resent to the dead letter exchange, and then we consume the message in the dead letter queue and determine whether the item has been paid based on the item ID.

If payment is not made, cancel the order and modify the order status to Pending order. If payment has been made, change the item status to Completed and discard the dead letter message.

5. Summary

RabbitMQ is a powerful messaging middleware that plays a key role in many Internet applications, such as monitoring image data reporting by Huawei camera SDK, asynchronous consumption in most e-commerce systems, and so on.