5.RabbitMQ release confirmation advanced

5 Release Confirmation Advanced

Due to some unknown reasons in the production environment, rabbitMQ restarted. During the restart of RabbitMQ, the producer failed to deliver messages, resulting in message loss, which required manual processing and reply. So we tried to think about how to achieve reliable delivery of RabbitMQ. Especially in extreme situations. When the RabbitMQ cluster is unavailable, how to handle undeliverable messages.

5.1 release confirmed SpringBoot version

5.1.1 Confirmation mechanism plan

When the switch is lost, the message sent is naturally lost. The switch exists, the queue is lost, and the switch cannot deliver to the queue. At this time, the message will still be discarded.

Therefore, the producer sends the message to the exchange or queue (MQ). Regardless of whether the exchange cannot receive it or cannot deliver it to the queue, the producer should store the message in the cache and then use scheduled tasks to resend the failed message.

5.1.2 Code Architecture Diagram

This architecture will have two problems: 1: There is a problem with the switch. 2: There is a problem with the queue.

5.1.3 Producers

package com.rabbitmq02.controller;

import com.rabbitmq02.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/confirm")
@RestController
@Slf4j
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //Send message
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
           rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY + "11",message,correlationData);
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME + "123",ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
        

        log.info("Send message content: {}",message);
    }
}

5.1.4 Consumer

package com.rabbitmq02.consumer;

import com.rabbitmq02.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/***
 * @Description
 * Release confirmed for Premium Consumer
 * @ClassName ConfirmConsumer
 * @Author LY
 * @Date 2023/11/7 11:37
 **/

@Component
@Slf4j
public class ConfirmConsumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void confirmMsg(Message message){
       String msg= new String(message.getBody());
        log.info("{}queue, received message content: {}",ConfirmConfig.CONFIRM_QUEUE_NAME,msg);
    }
}

5.1.5 Add configuration

Add in application.properties

spring.rabbitmq.publisher-confirm-type=correlated

There are three optional modes

NONE:

Disable release confirmation mode (default)

CORRELATED:

After the message is successfully published to the exchange, the callback method will be triggered.

SIMPLE:

There are two effects after testing:

1. Like CORRELATED, the callback method will be triggered after the message is successfully published to the exchange.

2. After successfully publishing the message, use RabbitTemplate to call the waitFprConfirms or waitForConfirmsOrDie method, wait until the broker node returns the sending result, and determine the favorite one-step logic based on the return result. It should be noted that if the waitForConfirmsOrDie method returns false, the channel will be closed, and messages will not be sent to the broker. Equivalent to a single confirmation.

5.1.6 Callback interface

When the producer message is sent successfully, it cannot sense whether the message is sent successfully, so a meeting point interface should be provided for callback after the message is successfully delivered.

package com.rabbitmq02.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    //The check-in return to RabbitTemplate is missing. If you don't inject it, you still won't be able to catch the modified interface.
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Inject
     * The PostConstruct annotation will be executed after other annotations are completed.
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }
    /**
     * Switch confirmation callback method
     * 1. The message-sending switch received successfully
     * @param correlationData saves the message id and related information
     * @param b Whether the switch receives the message true
     * @param s reason null
     * 2. The message-sending switch failed to receive
     * @param correlationData saves the message id and related information
     * @param b Whether the switch received the message false
     * @param s reason
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
            String id = correlationData==null?"":correlationData.getId();
        if (b){
            log.info("The switch has received the message, id:{}",id);
        }else {
            log.info("The switch did not receive the message, id: {}, reason: {}", id,s);
        }
    }
}

5.1.7 Result Analysis

The correct switch, the correct routingKey, the switch received the message, the queue received the message, and the correct callback was called.

Switch: confirm.exchange, sending message content: Hello 2, routingKey: key1
confirm.queue queue, received message content: Hello 2
Correct callback function, the switch has received the message, id: 1

Wrong switch, correct routingKey, the switch did not receive the message, the queue also did not receive the message, and there was an incorrect callback.

Switch: confirm.exchange123, sending message content: Hello 2, routingKey: key1

Shutdown Signal: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND – no exchange ‘confirm.exchange123’ in vhost ‘/’, class-id=60, method- id=40)

Wrong callback function, the switch did not receive the message, id: 1, reason: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND – no exchange ‘confirm.exchange123’ in vhost ‘/’, class-id=60, method-id=40)

Correct switch, wrong routingKey, the switch received the message, the queue did not receive the message, and the correct callback was called.

Switch: confirm.exchange, sending message content: Hello 2, routingKey: key111

Correct callback function, the switch has received the message, id: 1

Therefore, this method can only ensure that the message reaches the switch correctly, but does not guarantee whether it is actually received by the queue.

5.2 Rollback message

5.2.1 Mandatory parameters

When only the producer confirmation mechanism is enabled, after receiving the message, the switch will directly send a confirmation message to the producer. If it is found that the message is not routable (cannot be sent to the queue), then the message will be directly discarded. At this time The producer is unaware of the event that the message was discarded. By setting the Mandatory parameter, messages that cannot reach the destination during delivery can be returned to the producer.

5.2.2 New configuration

spring.rabbitmq.publisher-returns=true

5.2.3 New configuration

You can implement the RabbitTemplate.ReturnCallback interface or the RabbitTemplate.ReturnsCallback interface.

package com.rabbitmq02.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    //The check-in return to RabbitTemplate is missing. If you don't inject it, you still won't be able to catch the modified interface.
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Inject
     * The PostConstruct annotation will be executed after other annotations are completed.
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * Switch confirmation callback method
     * 1. The message-sending switch received successfully
     * @param correlationData saves the message id and related information
     * @param b Whether the switch receives the message true
     * @param s reason null
     * 2. The message-sending switch failed to receive
     * @param correlationData saves the message id and related information
     * @param b Whether the switch received the message false
     * @param s reason
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
            String id = correlationData==null?"":correlationData.getId();
        if (b){
            log.info("Correct callback function, the switch has received the message, id:{}",id);
        }else {
            log.info("Wrong callback function, the switch did not receive the message, id: {}, reason: {}", id,s);
        }
    }
    /**
     * The producer sends a message if the message is not queued by the corresponding switch.
     * The message will be returned to the producer for resend.
     * Return the message to the producer when the destination cannot be reached during the process of sending the message
     */


    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("Message {}, returned by switch {}, the reason for return is {}, routing key is {}",
                new String(message.getBody()),
                exchange, replyText, routingKey);

    }



   /* @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("Message {}, returned by switch {}, the reason for return is {}, routing key is {}",
                new String(returnedMessage.getMessage().getBody()),
                returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
    }
*/

}

5.2.4 Result Analysis

The correct switch, the correct routingKey, the switch receives successfully, the queue receives successfully, the consumer receives successfully, and the correct callback.

Switch: confirm.exchange, sending message content: Hello 2, routingKey: key1

confirm.queue queue, received message content: Hello 2
Correct callback function, the switch has received the message, id: 1

Correct switch, wrong routingKey, switch acceptance is successful, queue is not received, consumer is not received, correct callback function, message is returned to producer.

Switch: confirm.exchange, sending message content: Hello 2, routingKey: key111
Message Hello 2 was returned by the switch confirm.exchange. The reason for the return was NO_ROUTE and the routing key was key111.
Correct callback function, the switch has received the message, id: 1

5.3 Backup switch

With the mandatory parameters and fallback messages, we gain the ability to perceive undeliverable messages, and have the opportunity to detect and process when such messages cannot be delivered. But sometimes, we don’t know that these messages cannot be processed. At most, we print the log, then trigger the alarm and handle it manually. It also increases the complexity of the producer. If the producer is on multiple servers, manual copying of logs is prone to errors.

If we don’t want to increase the complexity of the producer, but also want to handle these messages that cannot be routed, we can set up a switch as a backup switch for another switch. When the switch receives an unroutable message, it will Forwarded to the backup switch, the backup switch will forward and process it. Usually the backup switch type is Fanout, so that all messages can be delivered to the queue bound to it, and then we bind a queue under the backup switch. In this way, all messages that cannot be routed by the original switch will enter the queue. You can also create a new alarm queue and use independent consumers to monitor and alarm.

5.3.1 Code Architecture Diagram

5.3.2 Modify configuration file

Based on the advanced release confirmation mode, new backup switches, backup queues, and alarm queues are added

package com.rabbitmq02.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/***
 * @Description
 * Release confirmation advanced configuration class
 * @ClassName ConfirmConfig
 * @Author LY
 * @Date 2023/11/7 11:25
 **/
@Configuration
public class ConfirmConfig {
    //switch
    public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";
    //queue
    public static final String CONFIRM_QUEUE_NAME="confirm.queue";
    //ROUTINGKEY
    public static final String CONFIRM_ROUTING_KEY="key1";
    //Backup switch
    public static final String BACKUP_EXCHANGE_NAME="backup.exchange";
    //Backup queue
    public static final String BACKUP_QUEUE_NAME="backup.queue";
    //alarm queue
    public static final String WARNING_QUEUE_NAME="warning.queue";

    //Create a backup switch
    @Bean
    public FanoutExchange buckupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    //Declare the switch
    @Bean
    public DirectExchange confirmExchange(){
        //return new DirectExchange(CONFIRM_EXCHANGE_NAME);
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
    }

    //Declare the queue
    @Bean
    public Queue confirmQueue(){
        return new Queue(CONFIRM_QUEUE_NAME);
    }
    //Create backup queue
    @Bean
    public Queue backupQueue(){
        return new Queue(BACKUP_QUEUE_NAME);
    }
    //Create alarm queue
    @Bean
    public Queue warningQueue(){
        return new Queue(WARNING_QUEUE_NAME);
    }
    //Bind
    @Bean
    public Binding bindconfirmExchangeToConfirmQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,@Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }


    //Bind backup switch and backup queue
    @Bean
    public Binding buckupExchangeBindToBackupQueue(@Qualifier("buckupExchange") FanoutExchange buckupExchange,@Qualifier("backupQueue") Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(buckupExchange);
    }
    //Bind backup switch and alarm queue
    @Bean
    public Binding buckupExchangeBindToWarningQueue(@Qualifier("buckupExchange") FanoutExchange buckupExchange,@Qualifier("warningQueue") Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(buckupExchange);
    }
}

5.3.3 Add alarm consumer

package com.rabbitmq02.consumer;

import com.rabbitmq02.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/***
 * @Description
 * Release confirmed for Premium Consumer
 * @ClassName ConfirmConsumer
 * @Author LY
 * @Date 2023/11/7 11:37
 **/

@Component
@Slf4j
public class WarningConsumer {
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void confirmMsg(Message message){
       String msg= new String(message.getBody());
        log.info("{}alarm queue queue, message content received: {}",ConfirmConfig.WARNING_QUEUE_NAME,msg);
    }
}

5.3.4 Result Analysis

The correct switch and the correct routingKey are still received normally.

Switch: confirm.exchange, sending message content: Hello 2, routingKey: key1
Correct callback function, the switch has received the message, id: 1
confirm.queue queue, received message content: Hello 2

The correct switch and the wrong routingKey are sent to the alarm queue through the backup switch.

Switch: confirm.exchange, sending message content: Hello 2, routingKey: key111
warning.queue alarm queue queue, received message content: Hello 2

When the mandatory parameter is used together with the backup switch, the backup switch has a higher priority. The presence of both will not be returned to the producer.