The message sent by RabbitMQ to the delay switch calls the returnedMessage method and the function of the returnedMessage() method

1: The function of the returnedMessage() method

The method of the switch to return the message - the message is not delivered to the queue to trigger the callback
(1) It is often used when the switch cannot route the fallback message.
(2) If the switch is bound to a backup switch, it is routed to the backup switch, and this method does not call back.
(3) Call back this method if it is sent to the delay switch, so if you use the delay switch, you need to filter the message that the delay switch callbacks.

2: Demonstrate the phenomenon that the wrong Routingkey leads to the failure of routing, and trigger the callback

1. Declare switches and queues and bind them

/**
 * Non-routable switch and queue configuration
 *
 * @Author darren
 * @Date 2023/3/23 20:02
 */
@Configuration
@Slf4j
public class NotRoutableConfig {

    @Bean("notRoutableExchange")
    public DirectExchange getNotRoutableExchange(){
        return ExchangeBuilder.directExchange(ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME).build();
    }

    @Bean("notRoutableQueue")
    public Queue getNotRoutableQueue() {
        return QueueBuilder.durable(QueueUtil.NOT_ROUTABLE_QUEUE_NAME).build();
    }

    @Bean
    public Binding getBinding(
            @Qualifier("notRoutableQueue") Queue notRoutableQueue,
            @Qualifier("notRoutableExchange") DirectExchange notRoutableExchange){
        return BindingBuilder.bind(notRoutableQueue).to(notRoutableExchange).with(RoutingKeyUtil.NOT_ROUTABLE_RIGHT_ROUTING_KEY);
    }
}

2. CallBack implementation class

/**
 * Release confirmation-message callback class
 *
 * @Author darren
 * @Date 2023/3/21 22:38
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate. ConfirmCallback, RabbitTemplate. ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //Dependency injection rabbitTemplate and then set its callback object
    @PostConstruct
    public void init(){
        // confirmation callback
        rabbitTemplate.setConfirmCallback(this);
        // return callback
        rabbitTemplate. setReturnCallback(this);
    }

    /**
     * A callback method whether the switch receives a message or not
     *
     * @param correlationData: message correlation data
     * @param ack: Whether the switch received the message
     * @param cause
     */
    @Override
    public void confirm(final CorrelationData correlationData, final boolean ack, final String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("The switch confirmation callback method has received the message with id: {}", id);
        }else{
            log.info("The switch confirmation callback method has not received the message with id: {}, due to the reason: {}", id, cause);
        }
    }

    /**
     * The method for the switch to return the message - the message is not delivered to the queue to trigger the callback
     * Commonly used for switches unable to route fallback messages
     * If the switch is bound to a backup switch, it is routed to the backup switch, and this method does not call back.
     *
     * @param message the returned message.
     * @param replyCode the reply code.
     * @param replyText the reply text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(final Message message, final int replyCode, final String replyText,
            final String exchange,
            final String routingKey) {
        // Exclude the delay switch, because the message is delayed in the delay switch, and this function callback is triggered if it is not delivered to the queue
        if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
            log.info("The message received by the exchange method: {} The content of the exchange reply: {}, the exchange is: {}, the routing key: {}",
                    new String(message. getBody()), replyText, exchange, routingKey);
        }
    }
}

3. Producer

/**
 * non-routable switch-producer
 * @Author darren
 * @Date 2023/3/23 20:16
 */
@RestController
@Slf4j
@RequestMapping("/notRoutableExchange")
public class notRoutableExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMessage/{message}")
    public String sendNotRoutableExchangeMsg(@PathVariable("message") String message) {
        rabbitTemplate. convertAndSend(
                ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME,
                RoutingKeyUtil.NOT_ROUTABLE_ERROR_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());
        return "The message sent to the non-routable exchange was successful";
    }
}

4. Results

http://localhost:8888/notRoutableExchange/sendMessage/heheh
UUID is: 500ba6ed-845c-4ac3-80b2-343f6906a69b
The message received by the exchange method: heheh The content of the exchange reply: NO_ROUTE, the exchange is: not.routable.exchange, the routing key: not.routable.error.routing.key
The switch confirms that the callback method has received the message with the id: 500ba6ed-845c-4ac3-80b2-343f6906a69b

3: If the switch is bound to a backup switch, it is routed to the backup switch, and this method does not call back.

1. Declare release to confirm the switch and queue and bind the backup switch

/**
 * Publish confirmation - configure switches and queues
 * @Author darren
 * @Date 2023/3/21 22:02
 */
@Configuration
public class ConfirmConfig {

    /**
     * Publish the confirmation switch and bind the standby switch
     * @return
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(ExchangeUtil.CONFIRM_EXCHANGE_NAME)
                .durable(true).withArgument("alternate-exchange", ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(QueueUtil.CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding confirmQueueBindingConfirmExchange(
            @Qualifier("confirmExchange") DirectExchange confirmExchange,
            @Qualifier("confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(RoutingKeyUtil.CONFIRM_ROUTING_KEY);
    }
}

2. Declare and bind the backup switch and queue

/**
 * Backup switches and queues
 * @Author darren
 * @Date 2023/3/22 20:10
 */
@Configuration
@Slf4j
public class backupConfig {

    @Bean("backupExchange")
    public FanoutExchange getBackupExchange(){
        return ExchangeBuilder.fanoutExchange(ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("backupQueue")
    public Queue getBackupQueue(){
        return QueueBuilder.durable(QueueUtil.BACKUP_QUEUE_NAME).build();
    }

    @Bean("warningQueue")
    public Queue getWarningQueue(){
        return QueueBuilder.durable(QueueUtil.WARNING_QUEUE_NAME).build();
    }

    @Bean
    public Binding backupQueueBindingBackupExchange(
            @Qualifier("backupQueue") Queue backupQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding warningQueueBindingBackupExchange(
            @Qualifier("warningQueue") Queue warningQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

3. Consumer

/**
 * Publish confirmation mode queue consumer
 *
 * @Author darren
 * @Date 2023/3/21 22:49
 */
@Component
@Slf4j
public class ConfirmQueueConsumer {

    @RabbitListener(queues = QueueUtil. CONFIRM_QUEUE_NAME)
    public void receiveConfirmQueue(Message message) {
        String msg = new String(message. getBody());
        log.info("Consumer received message from queue:{}:{}", QueueUtil.CONFIRM_QUEUE_NAME, msg);
    }

    @RabbitListener(queues = QueueUtil. BACKUP_QUEUE_NAME)
    public void receiveWarningQueueMessage(Message message) {
        String msg = new String(message. getBody());
        log.info("Consumer received message from queue:{}:{}", QueueUtil.BACKUP_QUEUE_NAME, msg);
    }
}

4. Producer

/**
 * Publish Confirmation Mode - Producer
 * @Author darren
 * @Date 2023/3/21 22:12
 */
@RestController
@Slf4j
@RequestMapping("/confirmExchange")
public class ConfirmExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * make an announcement
     *
     * The correct route sent to the switch can be consumed successfully
     * The wrong route sent to the switch cannot be consumed, but the switch can receive the message, but it cannot reach the queue because it cannot be routed
     * @param message
     * @return
     */
    @GetMapping("/sendMessage/{message}")
    public String sendMessage(@PathVariable String message) {

        log.info("Send message: {} to exchange: {} ", message, ExchangeUtil.CONFIRM_EXCHANGE_NAME);
        rabbitTemplate. convertAndSend(
                ExchangeUtil.CONFIRM_EXCHANGE_NAME,
                RoutingKeyUtil.CONFIRM_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());

        rabbitTemplate. convertAndSend(
                ExchangeUtil.CONFIRM_EXCHANGE_NAME,
                RoutingKeyUtil.CONFIRM_ERROR_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());
        return "Release confirmation mode sends message successfully";
    }

}

5. It turns out that returnedMessage() has no callback

Send message: heheh to exchange: confirm.exchange
The UUID is: dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce
The UUID is: 83611012-9de2-4d52-8e6d-03baf0bc1e44
The switch confirms that the callback method has received the message with the id: dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce
The switch confirms that the callback method has received the message with the id: 83611012-9de2-4d52-8e6d-03baf0bc1e44
The consumer receives the message from the queue: warning.queue: heheh
The consumer receives the message from the queue: confirm.queue: heheh
The consumer receives the message from the queue: backup.queue: heheh

4: Call back this method if it is sent to a delay switch

1. Declare delay switches and queues and bind them

/**
 * 3- Delayed message plug-in
 *
 * Principle:
 * Exchange type x-delayed-message , this type of message supports delayed delivery mechanism.
 * After receiving the message, the message will not be delivered to the target queue immediately, but stored in the mnesia table (a distributed database),
 * Then detect the message delay time, if it reaches the deliverable time (expiration time), pass it through x-delayed-type
 * Type marked switches are posted to the target queue.
 * @Author darren
 * @Date 2023/3/21 20:56
 */
@Configuration
public class DelayedExchangeConfig {

    /**
     * Custom switch - delay switch
     * @return
     */
    @Bean("delayedExchange")
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(
                ExchangeUtil.DELAYED_EXCHANGE_NAME,
                "x-delayed-message", true, false, args);
    }

    /**
     * Ordinary queue
     *
     * @return
     */
    @Bean("delayedQueue")
    public Queue delayedQueue(){
        return QueueBuilder.durable(QueueUtil.DELAYED_QUEUE_NAME).build();
    }

    /**
     * Queue binding delay switch
     * @param delayedQueue
     * @param delayedExchange
     * @return
     */
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange ){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange)
                .with(RoutingKeyUtil.DELAYED_ROUTING_KEY).noargs();
    }
}

2. Consumers

/**
 * Delay switch queue consumer
 *
 * @Author darren
 * @Date 2023/3/23 16:25
 */
@Component
@Slf4j
public class DelayedQueueConsumer {
    @RabbitListener(queues = QueueUtil. DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message. getBody());
        log.info("Consumer received message from queue:{}:{}", QueueUtil.DELAYED_QUEUE_NAME, msg);
    }
}

3. Producer

/**
 * Delay switch-producer
 *
 * @Author darren
 * @Date 2023/3/21 17:20
 */
@RestController
@Slf4j
@RequestMapping("/delayedExchange")
public class DelayedExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * sent to delay switch
     * Messages have a time-sorted function in the exchange
     * @return
     */
    @GetMapping("/sendMessage/{message}/{delayTime}")
    public String sendMessage(@PathVariable String message,
            @PathVariable Integer delayTime) {
        rabbitTemplate. convertAndSend(
                ExchangeUtil.DELAYED_EXCHANGE_NAME,
                RoutingKeyUtil. DELAYED_ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(delayTime);
                    return messagePostProcessor;
                },
                CorrelationDataUtil.getCorrelationData());
        log.info("Send a message with a delay of: {} milliseconds: {} to the exchange: {}",
                delayTime, message, ExchangeUtil.DELAYED_EXCHANGE_NAME);
        return "Send delay switch message successfully";
    }
}

4. The returnedMessage() method

 /**
     * The method for the switch to return the message - the message is not delivered to the queue to trigger the callback
     * Commonly used for switches unable to route fallback messages
     * If the switch is bound to a backup switch, it is routed to the backup switch, and this method does not call back.
     *
     * @param message the returned message.
     * @param replyCode the reply code.
     * @param replyText the reply text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(final Message message, final int replyCode, final String replyText,
            final String exchange,
            final String routingKey) {
        // Exclude the delay switch, because the message is delayed in the delay switch, and this function callback is triggered if it is not delivered to the queue
        //if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
            log.info("The message received by the exchange method: {} The content of the exchange reply: {}, the exchange is: {}, the routing key: {}",
                    new String(message. getBody()), replyText, exchange, routingKey);
        //}
    }

5. As a result, it is found that the delayed switch will call back the returnedMessage() method. So if there is a delay queue, it should be excluded.

UUID is: e8b70810-852b-400c-9fe0-bffdf1a5247d
Send a delayed: 10000 ms message: haha to the exchange: delayed.exchange
The way the switch returns the message The message received: haha The content of the switch reply: NO_ROUTE, the switch is: delayed.exchange, the routing key: delayed.routing.key
The switch confirms that the callback method has received the message with the id: e8b70810-852b-400c-9fe0-bffdf1a5247d
The consumer receives the message from the queue: delayed.queue: haha