rabbitmq message confirmation based on springboot

Overview

There are two types of message confirmation in RabbitMQ. One is message sending confirmation. This is used to confirm whether the message is successfully delivered when the producer sends the message to the exchanger and the exchanger passes it to the queue. Sending confirmation is divided into two steps, one is to confirm whether it reaches the switch, and the other is to confirm whether it reaches the queue. The second is consumption receipt confirmation. This is to confirm whether the consumer has successfully consumed the message in the queue.

Details

1. Operational effect

image.png

2. Implementation process

①. Introduce rabbitmq package
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
②. Modify application.properties configuration
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  
# The sender turns on the confirm confirmation mechanism
spring.rabbitmq.publisher-confirms=true
# The sender turns on the return confirmation mechanism
spring.rabbitmq.publisher-returns=true
################################################ ##
#Set manual ack on the consumer side
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Whether to support retry
spring.rabbitmq.listener.simple.retry.enabled=true
③. Define exchange and queue, and bind queue to exchange
package com.mm.springbootrabbitmqconfirmdemo.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean(name = "confirmQueue")
    public Queue confirmQueue(){
        return new Queue("confirmQueue",true,false,false);
    }
 
    @Bean(name = "confirmExchange")
    public FanoutExchange confirmExchange(){
        return new FanoutExchange("confirmExchange");
    }
 
    @Bean
    public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange,
                                                 @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange);
    }
 
}
④. Message sending confirmation

Send message confirmation: Used to confirm that the producer producer sends the message to broker, the switch exchange on broker and then In the process of delivering to the queue queue, whether the message is successfully delivered.

Messages from producer to rabbitmq broker have a confirmCallback confirmation mode.

Message delivery failure from exchange to queue has a returnCallback fallback mode.

We can use these two Callback to ensure 100% delivery of the message.

1. ConfirmCallback confirmation mode

As long as the message is received by rabbitmq broker, the confirmCallback callback will be triggered.

package com.mm.springbootrabbitmqconfirmdemo.service;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause){
        if (!ack) {
            log.error("Message sending exception!");
        } else {
            log.info("The sender's father has received the confirmation, correlationData={}, ack={}, cause={}", correlationData.getId(), ack, cause);
        }
 
    }
 
 
}

Implement the interface ConfirmCallback and rewrite its confirm() method. There are three parameters in the method: correlationData, ack, cause.

  • correlationData: There is only one id attribute inside the object, which is used to indicate the uniqueness of the current message.

  • ack: The status of message delivery to broker, true indicates success.

  • cause: Indicates the reason for delivery failure.

However, the message being received by broker can only mean that it has arrived at the MQ server, and there is no guarantee that the message will be delivered to the target queue. So you need to use returnCallback next.

2. ReturnCallback return mode

If the message fails to be delivered to the target queue, the callback returnCallback will be triggered. Once the message is not successfully delivered to the queue, the current message will generally be recorded here. Detailed delivery data to facilitate subsequent operations such as resending or compensation.

com.mm.springbootrabbitmqconfirmdemo.service;

lombok.extern.slf4j.;
org.springframework.amqp.core.Message;
org.springframework.amqp.rabbit.core.RabbitTemplate;
org.springframework.stereotype.;

ReturnCallbackService RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;

Implement the interface ReturnCallback and override the returnedMessage() method. The method has five parameters: message (message body), replyCode (response code), replyText (response content), exchange (switch), routingKey (queue).

The following is the specific message sending. Set the Confirm and Return callbacks in rabbitTemplate. We use setDeliveryMode() to The message is persisted, and a CorrelationData object is created for subsequent testing, and an id is added as 10000000000.

⑤. Message sending confirmation

Message reception confirmation is simpler than message sending confirmation, because there is only one message receipt (ack) process. The method marked with the @RabbitHandler annotation needs to add two parameters: channel (channel) and message.

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
     
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
  
        try {
            log.info("Xiaofu received the message: {}", msg);
  
            //TODO specific business
             
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  
        } catch (Exception e) {
             
            if (message.getMessageProperties().getRedelivered()) {
                 
                log.error("The message has been processed repeatedly and failed, and it is refused to be received again...");
                 
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // Reject the message
            } else {
                 
                log.error("The message will be returned to the queue for processing again...");
                 
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

There are three receipt methods for consuming messages. Let’s analyze the meaning of each method.

1.basicAck

basicAck: Indicates successful confirmation. After using this receipt method, the message will be deleted by rabbitmq broker.

void basicAck(long deliveryTag, boolean multiple)

deliveryTag: Indicates the message delivery sequence number. Each time a message is consumed or the message is re-delivered, deliveryTag will increase. In manual message confirmation mode, we can perform operations such as ack, nack, and reject on the message with specified deliveryTag.

multiple: Whether to confirm in batches. If the value is true, all messages smaller than the current message deliveryTag will be ack at once.

For example: Suppose I first send three messages deliveryTag, which are 5, 6, and 7, but they are not confirmed. When I send the fourth message, deliveryTag is 8, and multiple is set to true, all messages 5, 6, 7, and 8 will be confirmed.

2.basicNack

basicNack: Indicates failure confirmation. This method is generally used when the message consumption business is abnormal, and the message can be re-delivered into the queue.

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag: Indicates the message delivery sequence number.

multiple: Whether to confirm in batches.

requeue: A value of true causes the message to be requeued.

3.basicReject

basicReject: Reject the message. The difference from basicNack is that batch operations cannot be performed. Other usages are very similar.

void basicReject(long deliveryTag, boolean requeue)

deliveryTag: Indicates the message delivery sequence number.

requeue: A value of true causes the message to be requeued.

3. Project structure chart

image.png

4. Supplement

1. Don’t forget to confirm the message

This is a very untechnical pit, but it is a very easy place to make mistakes.

Enable the message acknowledgment mechanism, and don’t forget to channel.basicAck when consuming messages, otherwise the messages will always exist, resulting in repeated consumption.

2. Unlimited message delivery

When I first came into contact with the message confirmation mechanism, the consumer code was written as follows. The idea was very simple: after processing the business logic, confirm the message. int a = 1 / 0 After an exception occurs, The message is put back into the queue.

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
  
        try {
            log.info("Consumer 2 received: {}", msg);
  
            int a = 1 / 0;
  
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  
        } catch (Exception e) {
  
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

3. Repeat consumption

How to ensure that MQ consumption is idempotent depends on the specific business. You can use MySQL or redis to persist the message and ensure the uniqueness in the message. Attribute verification.

It can be seen that after using RabbitMQ, our business links have obviously become longer. Although the decoupling between systems has been achieved, the scenarios that may cause message loss have also increased. For example:

  • Message producer -> rabbitmq server (message sending failed)

  • Rabbitmq server’s own failure causes message loss

  • Message consumer -> rabbitmq service (failed to consume messages)