springboot integrates rabbitmq fallback message

Fallback message

When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message, such as
If the message is found to be unroutable, the message will be discarded directly. At this time, the producer does not know that the message is discarded. So how do I get messages that cannot be routed to help me find a way to deal with them? At least let me know, so I can handle it myself. By setting the mandatory parameter, the message can be returned to the producer when the destination is unreachable during message delivery.
Simply put, if the RoutingKey is wrong and the switch is normal, a message will be thrown directly.

1.yml configuration

spring:
  rabbitmq:
    host: 192.168.64.137
    port: 5672
    username: guest
    password: 123
    virtual-host: /
# publisher-confirm-type: correlated
# Enable release confirmation type
# ? NONE
# Disable publish confirmation mode, is the default
# ? CORRELATED
# The callback method will be triggered after the message is successfully published to the exchange
# ? SIMPLE (send one to confirm, not to use)
# After testing, there are two effects, one of which will trigger the callback method as the CORRELATED value,
# Second, use the rabbitTemplate to call the waitForConfirms or waitForConfirmsOrDie method after the message is successfully published
# Wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. The points to note are
# If the waitForConfirmsOrDie method returns false, the channel will be closed, and then messages cannot be sent to the broker
    publisher-confirms: true
# Messages that cannot be routed will fall back
    publisher-returns: true

2. Configuration class

package com.cherry.rabbitmqttl.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Configuration class release confirmation (advanced)
 */
@Configuration
public class ConfirmConfig {<!-- -->
    // switch
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    // queue
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    // Declare the switch
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){<!-- -->
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

    // queue
    @Bean("confirmQueue")
    public Queue confirmQueue(){<!-- -->
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }


    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,
                                        @Qualifier("confirmExchange")DirectExchange confirmExchange){<!-- -->
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

3. Producer

package com.cherry.rabbitmqttl.controller;

import com.cherry.rabbitmqttl.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * Test confirmation to start sending messages
 */
@Slf4j
@RequestMapping("/confirm")
@RestController
public class ConfirmProductController {<!-- -->

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * When the switch is wrong, the switch will display the error message that the switch has not received the message,
     * When the routingkey is wrong, it will display that the switch has received a message
     * Send message content, hello key1
     * Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange1' in vhost '/', class-id=60 , method-id=40)
     * The exchange has not received the id: ID=1 message, due to the reason: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange1' in vhost '/', class-id=60, method-id=40)
     * Send message content, hello key12
     * The switch has received the message with id: ID=2
     * @param message
     */
    // send message
    // ID unique identification
    // There is no way to get the failure callback when the routingkey is wrong
    @GetMapping("/sendMsg/{message}")
    public void sendMessage(@PathVariable String message){<!-- -->
        // send the message normally: both the exchange and the queue have received it
        CorrelationData correlationData3 = new CorrelationData("ID=0");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,
                message,
                correlationData3);
        log.info("Send message content, {}", message + "key0" );
        log.info("____________________________________________________________________");

        // The exchange sent the message incorrectly: the exchange did not receive it, the queue did not receive it
        CorrelationData correlationData1 = new CorrelationData("ID=1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME + 1,
                ConfirmConfig.CONFIRM_ROUTING_KEY + "key1",
                message + "key1",
                correlationData1);
        log.info("Send message content, {}", message + "key1");
        log.info("____________________________________________________________________");

        // Routingkey error sending message: the switch received it, but the queue did not receive it
        CorrelationData correlationData2 = new CorrelationData("ID=2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY + "2",
                message + "key12",
                correlationData2);
        log.info("Send message content, {}", message + "key12");


    }



}

4. Consumer

package com.cherry.rabbitmqttl.consumer;

import com.cherry.rabbitmqttl.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmConsumer {<!-- -->
    @RabbitListener(queues = ConfirmConfig. CONFIRM_QUEUE_NAME)
    public void receiveConfirm(Message message){<!-- -->
        String mesg = new String(message. getBody());
        log.info("Received queue confirm.queue message: {}", mesg);
    }
}

5. Rollback callback function

package com.cherry.rabbitmqttl.confirmcallback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * Perceive the message callback that cannot be consumed normally
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {<!-- -->
    // inject
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // @PostConstruct injection function is injected after Autowired injection
    @PostConstruct
    public void init(){<!-- -->
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate. setReturnCallback(this);
    }

    /**
     * A callback method whether the switch receives a message or not
     * CorrelationData
     * Message related data
     * ack
     * Whether the switch received the message
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, java.lang.String cause) {<!-- -->
        String id=correlationData!=null?correlationData.getId():"";
        if(ack){<!-- -->
            log.info("The switch has received the message with id: {}", id);
        }else{<!-- -->
            log.info("The exchange has not received the message with id: {}, due to the reason: {}", id, cause);
        }
    }


    // You can return the message to the producer when the message is not routable during message delivery (routingKey error)
    // Roll back only when the goal is unreachable
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- -->
        log.error("message {}, returned by switch {}, reason for return: {}, routing Key{}",
                new String(message. getBody()),
                exchange,
                replyText,
                routingKey);
    }


}

6.pom dependency

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>
    <groupId>com.cherry</groupId>
    <artifactId>rabbitmqttl</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmqttl</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--RabbitMQ dependency-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ test dependencies -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>