RabbitMQ message reliability (1)–Producer message confirmation

Foreword

In the project, the middleware RabbitMQ is introduced, and it is necessary to add a layer of consideration to data security in the business to ensure the reliability of RabbitMQ messages. Otherwise, the loss of each message may lead to inconsistencies in the data of the entire business, etc. The problem has a huge impact on the system. The reliability of messages can be mainly considered in three aspects: producer message confirmation, consumer message confirmation, and message persistence. This document explains producer message confirmation.

1. Message confirmation flow chart

As can be seen from the figure, message confirmation is divided into producer confirmation and consumer confirmation. The message confirmation mechanism between the producer and MQ is producer message confirmation, and the message confirmation mechanism between MQ and consumer is consumer message confirmation.

There are three scenarios for message loss:

  1. Network problems occurred during the message sending process: the producer thought the sending was successful, but MQ did not receive it; (requires producer message confirmation)
  2. After receiving the message, the message is lost due to reasons such as MQ server downtime or restart (the message is stored in the memory by default); (message persistence is required)
  3. After the consumer receives the message, an error occurs when processing the message. The message processing is not completed, but ack is automatically returned (at this time, the manual confirmation mode needs to be turned on to confirm the consumer message)

2. Producer message confirmation

RabbitMQ provides a publisher confirm mechanism to avoid message loss during delivery to MQ. Under this mechanism, each message must have a unique ID to distinguish different messages and avoid ack (message confirmation parameter) conflicts. Whenever a message is successfully sent to MQ, MQ will return a result to the producer to ensure that the producer message is confirmed. When the producer confirms the message, there are two return result methods (usually both must be implemented) to ensure the reliability of message delivery, namely publisher-confirm and publisher-return, which are explained below.

1. publisher-confirm (sender confirmation)

The message is successfully delivered to the switch and ack is returned.

The message is not delivered to the switch and nack is returned.

2. publisher-return (sender’s receipt)

The message is delivered to the switch, but is not routed to the queue. Returns ACK and the reason for routing failure.

3. Code Implementation

1. Configuration file

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    #Confirm that the message has been sent to the switch (Exchange)
    publisher-confirm-type: correlated
    #Confirm that the message has been sent to the queue (Queue)
    publisher-returns: true

publish-confirm-type has three values,

  1. none: Disable release confirmation mode, which is the default value
  2. Simple: wait for the confirm result synchronously until timeout
  3. Correlated: Asynchronous callback, define ConfirmCallback, MQ will call back this ConfirmCallback when returning the result

publisher-returns: Open message failure callback, callback function ReturnCallback

2. Configure the ConfirmCallback function and ReturnCallback function

/**
 * Producer message callback configuration class
 */
@Configuration
@Slf4j
public class ProviderCallBackConfig {

    @Resource
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        // When mandatory is set to true, if exchange cannot find a suitable queue to store the message based on its own type and message routingKey,
         //Then the broker will call the basic.return method to return the message to the producer.
        // When mandatory is set to false, the broker will directly discard the message in the above situation.
        rabbitTemplate.setMandatory(true);

        /**
         * TODO RabbitMQ producer sends message confirmation callback to solve message reliability problem
         * Message confirmation callback to confirm whether the message reaches the broker
         * data: unique identifier of the message
         *ack: Confirm the result
         * cause: failure reason
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                //After the message is sent successfully, update the database message status and other logic
                log.info("------The producer successfully sent the message to exchange, message unique identifier: {}, confirmation status: {}, cause: {}-----",correlationData, ack, cause) ;
            } else {
                //The message fails to be sent. After printing the log, you can choose whether to resend the message according to the business.
                log.info("------The producer failed to send a message to exchange, message unique identifier: {}, confirmation status: {}, cause: {}-----", correlationData, ack, cause) ;
            }
        });

        /**
         * TODO RabbitMQ producer fails to send message callback to solve message reliability problem
         * message message
         * replyCode response code
         * replyText response message
         * exchange switch
         * routingKey routing key
         */
        rabbitTemplate.setReturnsCallback((res) -> {
            //If the sending fails, print the error message, and then you can choose to resend the message according to the business
            log.error("------exchange failed to send message to queue, res: {}---------------", JSON.toJSONString(res));
        });
        return rabbitTemplate;
    }

}

At this point, the message confirmation of the message pushed by the producer has completed calling the callback function.
You can see that there are two callback functions written above, one is called ConfirmCallback and the other is called RetrunCallback;
So under what circumstances will the above two callback functions be triggered?

Let’s analyze the overall situation first. There are three situations for push messages:

①The message is pushed to the server, but the switch cannot be found in the server
②The message is pushed to the server and the switch is found, but the queue is not found.
③The message is pushed successfully

①The message is pushed to the server, but the switch cannot be found in the server
Write a test interface and push messages to the switch named ‘non-existent-exchange’ (this switch has not been created or configured):

@GetMapping("/testProviderMessageBack")
    @ApiOperation(value = "Test producer message callback")
    @ApiOperationSupport(order = 5)
    public String testProviderMessageBack() {
        CorrelationData data = new CorrelationData();
        data.setId("111");
        rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "Test producer message callback",data);
        return "ok";
    }

Call the interface and check the project’s console output (the reason is that the switch ‘non-existent-exchange’ was not found):

Conclusion: ①This situation triggers the ConfirmCallback callback function

——The producer failed to send a message to exchange. The unique identifier of the message: CorrelationData [id=111], confirmation status: false, cause: channel error; protocol method: #method(reply-code =404, reply-text=NOT_FOUND – no exchange ‘non-existent-exchange’ in vhost ‘/’, class-id=60, method-id=40)—–

② The message was pushed to the server and the switch was found, but the queue was not found
In this case, you need to add a switch, but do not bind a queue to this switch. I will simply add a direct switch in DirectRabitConfig, named ‘lonelyDirectExchange’, but do not perform any binding configuration operations on it:

 @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }

Then write a test interface and push the message to the switch named lonelyDirectExchange’ (this switch does not have any queue configuration):

@GetMapping("/testProviderMessageBack2")
    @ApiOperation(value = "Test producer message callback 2")
    @ApiOperationSupport(order = 6)
    public String testProviderMessageBack2() {
        CorrelationData data = new CorrelationData();
        data.setId("222");
        rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "Test producer message callback 2",data);
        return "ok";
    }

——The producer successfully sent the message to exchange, the unique identifier of the message: CorrelationData [id=222], confirmation status: true, cause: null—–

——exchange failed to send message to queue, res: {“exchange”:”lonelyDirectExchange”,”message”:{“body”:”5rWL6K + V55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg==”,”messageProperties”:{“contentEncoding”: “UTF-8″,”contentLength”:0,”contentType”:”text/plain”,”deliveryTag”:0,”finalRetryForMessageWithNoId”:false,”headers”:{“spring_returned_message_correlation”:”222″},”lastInBatch “:false,”priority”:0,”projectionUsed”:false,”publishSequenceNumber”:0,”receivedDeliveryMode”:”PERSISTENT”}},”replyCode”:312,”replyText”:”NO_ROUTE”,”routingKey”: “TestLonelyDirectRouting”}————–

The message was successfully pushed to the switch, so ConfirmCallback is true for message confirmation;
As you can see in the print parameters of the RetrunCallback callback function, when the route is distributed to the queue, the queue cannot be found, so the error NO_ROUTE is reported.
Conclusion: ② This situation triggers two callback functions: ConfirmCallback and RetrunCallback.

③Message pushed successfully
Then for testing, just call the previous message push interface normally. Just call the /sendDirectMessage interface and you can see the console output:

Conclusion: This situation triggers the ConfirmCallback callback function.