RabbitMQ’s Confirm mechanism

1. Reliability of messages

RabbitMQ provides a confirmation mechanism for Confirm.

The Confirm mechanism is used to confirm whether the message has been sent to the switch.

2.Java implementation

1. Import dependencies


2. Producer of Confirm mechanism

package com.qf.mq2302.hello;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
    //Declare the queue name
    public static final String QUEUE_NAME="queueA";

    public static void main(String[] args) throws Exception {

        //1. Get the connection object
        Connection conn = MQUtils.getConnection();

        //2. Create a channel object. Most operations of MQ are defined on the channel object.
        Channel channel = conn.createChannel();

        //3 Turn on confirm
        //3. Declare a queue
         *queue – the name of the queue
         *durable - true means that the queue created is durable (the queue will still exist after mq is restarted)
         * exclusive – whether the queue is exclusive (whether the queue can only be used by the connection currently creating the queue)
         * autoDelete – whether the queue can be automatically deleted by the mq server
         * arguments - other parameters of the queue, can be null
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello doubleasdasda!";

        //How the producer sends messages, use the following method
         * exchange – the name of the switch. If it is an empty string, it means the message is sent to the default switch.
         * routingKey – the routing key. When sending a message to the default switch, the routingkey represents the name of the queue.
         * other properties - other properties of the message, can be null
         * body – the content of the message, note that if there is a byte array
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //Check whether the message was sent successfully
        try {
             * Determine whether it is sent to the switch. If it is sent, return true.
             * If the switch cannot be sent because the switch name is wrong, an exception will be thrown and the channel will be automatically closed.
            if (channel.waitForConfirms()) {
                //If true is returned, it means the switch successfully received the message
                System.out.println("The message has been successfully sent to the switch");
                //Close the resource
            }else {
                System.out.println("The message failed to be sent to the switch");
                //Close the resource
        } catch (InterruptedException e) {
            System.out.println("The message failed to be sent to the switch");
            System.out.println("The failure message is:" + message);


3.Consumer of confirm mechanism

package com.qf.mq2302.hello;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv {
   private final static String QUEUE_NAME="hello-queue";

    public static void main(String[] args) throws Exception {
        //1. Get the connection object
        Connection conn = MQUtils.getConnection();

        //2. Create a channel object. Most operations of MQ are defined on the channel object.
        Channel channel = conn.createChannel();

         * The first parameter queue name
         * The second parameter, durability
         * The third parameter is exclusive
         * Whether the fourth parameter is automatically deleted
         * The fifth parameter can define what type of queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3. The processing logic after the consumer receives the message is written in the DeliverCallback object.
        DeliverCallback deliverCallback =new DeliverCallback() {
            public void handle(String consumerTag, Delivery message) throws IOException {
            //The byte array of the message sent by the producer can be obtained from the Delivery object
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                //Write the consumer's business logic here, for example, send an email


        //4. Let the current consumer start consuming messages in the (QUEUE_NAME) queue
         *queue – the name of the queue
         * autoAck – true indicates whether the current consumer is in automatic acknowledgment mode. true represents automatic confirmation.
         * deliverCallback – When a message is sent to the consumer, the logic of how the consumer handles the message
         * cancelCallback – when the consumer is canceled, if you want to execute code, write it here
      channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});



3. Integrate springboot implementation

1. Import dependencies


2.yml configuration file

    port: 6786
    password: test
    virtual-host: /test
    publisher-confirm-type: correlated #Enable the producer's confirm mechanism under the springboot project

3.RabbitMQ configuration file

package com.qf.bootmq2302.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class RabbitConfig {

    public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        //Set the connection factory object

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("correlationData:" + correlationData.getId());
                System.out.println("correlationData:" + new String(correlationData.getReturnedMessage().getBody()));

                //You can get the value message from redis through the ID

                //Represents whether the message is successfully sent to the switch, false if the message fails, true if the message is successfully sent
                System.out.println("ack:" + ack);
                //Represents the reason for the error
                System.out.println("cause:" + cause);

            return rabbitTemplate;


4. The producer writes a Controller

    RabbitTemplate rabbitTemplate;
    public String test1(String msg,String routkey){
        String exchangeName = "";//Default switch
        String routingkey = routkey;//Queue name

        //Create a CorrelationData object
        CorrelationData correlationData = new CorrelationData();

        Message message = new Message(msg.getBytes(), null);

        //To store the message content and message number in redis, key=message number, value=message content
        //key = bootmq:failmessage:001

        //Producer sends message
        //The fourth parameter can carry customized correlationData
        return "ok";

5. The consumer writes a receiving queue message

 @RabbitListener(queues = "queueA")
    public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {


        //Manual ack//If manual ack is enabled and manual ack is not given, just follow prefetch: 1 #The amount equivalent to basicQos(1), that's it, no more will be given to you, because you have not confirmed it. Confirm one and I will give you one


6.Consumer configuration file

    port: 6786
    password: test
    virtual-host: /test
        acknowledge-mode: manual # Manual ack
        prefetch: 1 #Equivalent to basicQos(1)