[RocketMQ Series 13] RocketMQ’s cluster core concept of consumption retry & dead letter queue & emergence and processing of idempotent messages

Hello, I am Fei Ge (wei158556), a coder. Thank you for reading this article. Welcome to connect three times with one click.
1. Python basics column, covering all basic knowledge in one place. If you buy it for 9.9 yuan, you will not suffer a loss or be cheated. Python from beginner to proficient
2. Graduation design column, we are not in a hurry during the graduation season, there are hundreds of graduation designs waiting for you to choose.
3. Python crawler column, systematically learn the knowledge points of crawlers. If you buy it for 9.9 yuan, you won’t suffer a loss, and you won’t be fooled if you buy it. Advanced entry into python crawler
4. Ceph actual combat, everything from principles to actual combat. Ceph in action
5. Introduction to Java high-concurrency programming, check in to learn Java high-concurrency. Introduction to High Concurrency Programming in Java

Article directory

    • 1. Consumption retry
      • 1.1. Consumption retry application scenario
      • 1.2. Principle of consumption retry
        • 1.2.1. Conditions for message retry triggering
      • 1.3. Number of consumption retries
    • 2. Dead letter queue
    • 3. The emergence of message idempotence problem
    • 4. How to ensure idempotent consumption?

1. Consumption retry

Message retry means that after the consumer fails to consume a certain message, the RocketMQ server will re-consume the message according to the retry policy. If the maximum number of retries has been exceeded and the message has not been successfully consumed, the message will not be re-consumed, but the message will be directly consumed. Sent to the dead letter queue.

1.1. Consumption retry application scenario

Message retry is mainly to solve consumption integrity problems caused by occasional message consumption failures. The reasons for these consumption failures include business processing logic problems and network jitter problems.

There are two main consumption retry application scenarios:

  1. The business processing failed, and the reason for the failure is related to the current message content. For example, the transaction status corresponding to the message has not been obtained, and it is expected to be executed successfully after a period of time.
  2. The reason for message failure is sporadic, such as failure caused by network jitters, consumer downtime and other sporadic problems. Subsequent messages will most likely be consumed successfully.

Do not use message failure as the result of conditional judgment, and do not use message failure to limit the processing rate.

1.2. Principle of consumption retry

The state machine of consumption retry is shown in the figure below: Messages that will be retried may go through the following four states.

Principle of message retry

  1. Ready: Ready state, the message is ready in the RocketMQ server and can be consumed by consumers
  2. Inflight: Processing state, the message is being obtained by the consumer, and the consumption result has not yet been returned during consumption.
  3. Commit: Submission status: The message is successfully consumed by the consumer, the consumer returns a successful response, and the message will end and be retried.
  4. Wait Retry: Waiting for retry status: A unique status of PushComsumer. When the consumer fails to consume or times out, the consumption retry mechanism will be triggered. If the current number of retries does not reach the maximum number of retries, the message will become in the pending retry state. After the retry interval, the message will become ready again and can be consumed again. Between multiple retries, the retry interval can be extended to prevent invalid high-frequency failures.
  5. DLQ: Dead letter queue: When message consumption fails and the number of consumption retries exceeds the maximum number of retries (the default is 16 times), the RocketMQ server will end the retry of the message and send the message directly to the dead letter in queue.
1.2.1. Conditions for triggering message retry
  1. Consumption failed:

When message consumption fails, consumption retry is triggered. That is, if the consumer does not return offset to the RocketMQ server, it is considered a consumption failure. Will trigger consumption retry.

The corresponding code does not return CONSUME_SUCCESS status:

 // 4. Create a callback function
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {<!-- -->
// 5. Process the message
for (MessageExt msg : msgs) {<!-- -->
System.out.println(msg);
System.out.println("Message content received: " + new String(msg.getBody()));
}
// 1. If the consumption monitor returns null, consumption will be retried.
return null;
//2. The consumption monitor returns RECONSUME_LATER and will also consume and retry.
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
});
  1. Message processing timeouts, including queuing timeouts in PushConsumer.

1.3. Number of consumption retries

RocketMQ will set up a retry queue with a Topic name of “%RETRY% + consumerGroup” for each consumer group (it should be noted here that the retry queue of this Topic is for the consumer group, not for each Topic) ), used to temporarily save messages that failed to consume due to various exceptions. Considering that it takes some time to recover from an exception, multiple retry levels will be set for the retry queue. Each retry level has a corresponding retry interval. As the number of retries increases, the retry interval will also increase. The bigger.

RocketMQ processes retry messages by first saving them to the delay queue with the topic name “SCHEDULE_TOPIC_XXXX”. The background scheduled tasks are delayed according to the corresponding time and then saved again to the retry queue of “%RETRY% + consumerGroup”.

The same as the setting of the delay queue, the message will be retried 16 times by default, and the time interval between each retry is as follows:

10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

image-20231010230137660

2. Dead letter queue

As mentioned earlier, after a message is retried for more than 16 times, it will be sent directly to the dead letter queue. In other words, the dead letter queue is used to store messages that cannot be consumed normally.

RocketMQ calls this type of message that cannot be consumed under normal circumstances a Dead-Letter Message, and the queue that stores dead-letter messages is called a Dead-Letter Queue. You can use the Console console to resend messages in the dead letter queue so that consumers can consume them again.

The dead letter queue has the following characteristics:

  1. RocketMQ will automatically create dead-letter queues for consumer groups that require dead-letter queues.
  2. The dead letter queue corresponds to the consumer group, and the dead letter queue contains the dead letter messages of all relevant topics of the consumer group.
  3. The validity period of messages in the dead letter queue is the same as that of normal messages, and the default is 48 hours.
  4. To consume messages in the dead letter queue, you need to set the permissions of the dead letter queue to 6 on the console, which means both read and write.

3. The emergence of message idempotence problem

Definition of idempotence: Idempotence means that the results of multiple operations are consistent. Query operations in the http interface are idempotent.

New operation: non-idempotent, new data will be inserted every time

Update operation: idempotent, modifying the same data

Delete operation: Delete based on id is idempotent.

So, how can non-idempotent operations ensure idempotence?

In the message queue, it is likely that a message will be sent repeatedly, or a message will be consumed by multiple consumers. For non-idempotent operations such as user registration, idempotence guarantees are required. The situation can be summarized as follows:

  1. The producer repeatedly sends messages: Due to network jitter, the producer does not receive the ack message from the broker and resends the message, resulting in repeated messages in the message queue.

  2. Consumers consume messages repeatedly: Due to network jitter, the consumer does not return ack to the broker, causing consumers to consume messages repeatedly.

  3. Repeated consumption during rebalance: Due to network jitter, consumers may repeatedly consume a message during rebalance redistribution.

    image-20231011222658661

4. How to ensure idempotent consumption?

  1. mysql inserts the business id as the primary key. The primary key is unique, so only one can be inserted at a time.
  2. Distributed lock using Redis or zk (mainstream solution)

For example, in the scenario of creating an order, we pass in orderId as the unique business ID when sending a message. When messages are sent repeatedly or messages are repeated, a logical judgment can be made based on the order ID.

In order to prevent two consumers from consuming the same duplicate message at the same time, a distributed lock can be added to the OderId to ensure that only one consumer can consume the same message at the same time.