RabbitMQ’s Confirm mechanism

1. Reliability of messages

RabbitMQ provides a confirmation mechanism for Confirm.

The Confirm mechanism is used to confirm whether the message has been sent to the switch.

2.Java implementation

1. Import dependencies

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

2. Producer of Confirm mechanism

package com.qf.mq2302.hello;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
    //Declare the queue name
    public static final String QUEUE_NAME="queueA";

    public static void main(String[] args) throws Exception {

        //1. Get the connection object
        Connection conn = MQUtils.getConnection();

        //2. Create a channel object. Most operations of MQ are defined on the channel object.
        Channel channel = conn.createChannel();

        //3 Turn on confirm
        channel.confirmSelect();
        //3. Declare a queue
        /**
         *queue – the name of the queue
         *durable - true means that the queue created is durable (the queue will still exist after mq is restarted)
         * exclusive – whether the queue is exclusive (whether the queue can only be used by the connection currently creating the queue)
         * autoDelete – whether the queue can be automatically deleted by the mq server
         * arguments - other parameters of the queue, can be null
         */
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello doubleasdasda!";

        //How the producer sends messages, use the following method
        /**
         * exchange – the name of the switch. If it is an empty string, it means the message is sent to the default switch.
         * routingKey – the routing key. When sending a message to the default switch, the routingkey represents the name of the queue.
         * other properties - other properties of the message, can be null
         * body – the content of the message, note that if there is a byte array
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //Check whether the message was sent successfully
        try {
            /**
             * Determine whether it is sent to the switch. If it is sent, return true.
             * If the switch cannot be sent because the switch name is wrong, an exception will be thrown and the channel will be automatically closed.
             */
            if (channel.waitForConfirms()) {
                //If true is returned, it means the switch successfully received the message
                System.out.println("The message has been successfully sent to the switch");
                //Close the resource
                channel.close();
            }else {
                System.out.println("The message failed to be sent to the switch");
                //Close the resource
                channel.close();
            }
        } catch (InterruptedException e) {
            System.out.println("The message failed to be sent to the switch");
            System.out.println("The failure message is:" + message);
        }



        conn.close();
    }
}

3.Consumer of confirm mechanism

package com.qf.mq2302.hello;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv {
   private final static String QUEUE_NAME="hello-queue";

    public static void main(String[] args) throws Exception {
        //1. Get the connection object
        Connection conn = MQUtils.getConnection();

        //2. Create a channel object. Most operations of MQ are defined on the channel object.
        Channel channel = conn.createChannel();

        /**
         * The first parameter queue name
         * The second parameter, durability
         * The third parameter is exclusive
         * Whether the fourth parameter is automatically deleted
         * The fifth parameter can define what type of queue
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3. The processing logic after the consumer receives the message is written in the DeliverCallback object.
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println(consumerTag);
            //The byte array of the message sent by the producer can be obtained from the Delivery object
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                //Write the consumer's business logic here, for example, send an email
                System.out.println(msg);

            }
        };

        //4. Let the current consumer start consuming messages in the (QUEUE_NAME) queue
        /**
         *queue – the name of the queue
         * autoAck – true indicates whether the current consumer is in automatic acknowledgment mode. true represents automatic confirmation.
         * deliverCallback – When a message is sent to the consumer, the logic of how the consumer handles the message
         * cancelCallback – when the consumer is canceled, if you want to execute code, write it here
         */
      channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});





    }





}

3. Integrate springboot implementation

1. Import dependencies

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.yml configuration file

spring:
  rabbitmq:
    host: 8.140.244.227
    port: 6786
    username:test
    password: test
    virtual-host: /test
    publisher-confirm-type: correlated #Enable the producer's confirm mechanism under the springboot project

3.RabbitMQ configuration file

package com.qf.bootmq2302.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {


    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        //Set the connection factory object
        rabbitTemplate.setConnectionFactory(cachingConnectionFactory);


        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("correlationData:" + correlationData.getId());
                System.out.println("correlationData:" + new String(correlationData.getReturnedMessage().getBody()));

                //You can get the value message from redis through the ID


                //Represents whether the message is successfully sent to the switch, false if the message fails, true if the message is successfully sent
                System.out.println("ack:" + ack);
                //Represents the reason for the error
                System.out.println("cause:" + cause);
            }
        });

            return rabbitTemplate;
    }



}

4. The producer writes a Controller

 @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/test1")
    public String test1(String msg,String routkey){
        System.out.println(msg);
        String exchangeName = "";//Default switch
        String routingkey = routkey;//Queue name

        //Create a CorrelationData object
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("001");

        Message message = new Message(msg.getBytes(), null);
        correlationData.setReturnedMessage(message);

        //To store the message content and message number in redis, key=message number, value=message content
        //key = bootmq:failmessage:001

        //Producer sends message
        //The fourth parameter can carry customized correlationData
        rabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData);
        return "ok";
    }

5. The consumer writes a receiving queue message

 @RabbitListener(queues = "queueA")
    public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {


        System.out.println(data);

        //Manual ack//If manual ack is enabled and manual ack is not given, just follow prefetch: 1 #The amount equivalent to basicQos(1), that's it, no more will be given to you, because you have not confirmed it. Confirm one and I will give you one
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }

6.Consumer configuration file

spring:
  rabbitmq:
    host: 8.140.244.227
    port: 6786
    username:test
    password: test
    virtual-host: /test
    #ManualACK
    listener:
      simple:
        acknowledge-mode: manual # Manual ack
        prefetch: 1 #Equivalent to basicQos(1)