[RabbitMQ] 4. Release Confirmation

Principle of Release Confirmation

The producer sets the channel to confirm mode. Once the channel enters the confirm mode, all messages published on the channel will be assigned a unique ID (starting from 1). Once the message is delivered to all matching queues, the broker A confirmation will be sent to the producer (including the unique ID of the message), which makes the producer know that the message has arrived at the destination queue correctly. If the message and the queue are durable, the confirmation message will be written to the disk After sending it out, the delivery-tag field in the confirmation message returned by the broker to the producer contains the serial number of the confirmation message. In addition, the broker can also set the multiple field of basic.ack, indicating that all messages up to this serial number have been received. deal with.
The biggest advantage of the confirm mode is that it is asynchronous. Once a message is published, the producer application can continue to send the next message while waiting for the channel to return confirmation. When the message is finally confirmed, the producer application can pass the callback method To process the confirmation message, if RabbitMQ loses the message due to its own internal error, it will send a nack message, and the producer application can also process the nack message in the callback method.

Policy for Publish Confirmation

How to enable release confirmation

Release confirmation is not enabled by default. If you want to enable it, you need to call the method confirmSelect. Whenever you want to use release confirmation, you need to call this method on the channel

Single confirmation release

This is a simple confirmation method. It is a synchronous confirmation method, that is, after publishing a message, only after it is confirmed and published, subsequent messages can continue to be published. waitForConfirmsOrDie(long) is only available when the message is confirmed. Time to return, if the message is not acknowledged within the specified time frame then it will throw an exception.
One of the biggest disadvantages of this confirmation method is that the publishing speed is particularly slow, because if the published message is not confirmed, the publication of all subsequent messages will be blocked. This method provides a maximum throughput of no more than hundreds of published messages per second. . Of course for some applications this may be sufficient.

package com.xxxx.rabbitmq.four;

import com.rabbitmq.client.Channel;
import com.xxxx.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

/**
 * Publish confirmation mode
 *
 * @author : li.linnan
 * @create : 2023/3/23
 */
public class ConfirmMessage {<!-- -->

    public static void main(String[] args) throws Exception {<!-- -->
        publishMessageIndividually();
    }

    //Number of messages sent in batches
    public static final int MESSAGE_COUNT = 1000;
    //Single confirmation
    public static void publishMessageIndividually() throws Exception{<!-- -->

        Channel channel = RabbitMqUtils. getChannel();
        String queueName = UUID. randomUUID(). toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //Enable release confirmation
        channel. confirmSelect();
        //Starting time
        long begin = System. currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i ++ ) {<!-- -->
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            // If the server returns false or does not return within the timeout period, the producer can resend the message
            boolean flag = channel.waitForConfirms();
            if(flag){<!-- -->
                System.out.println("Message sent successfully");
            }
        }
        long end = System. currentTimeMillis();
        System.out.println("Release" + MESSAGE_COUNT + "separate confirmation messages, time-consuming" + (end - begin) + "ms");


    }



}

Batch confirmation release

The above method is very slow. Compared with waiting for a single confirmation message, publishing a batch of messages first and then confirming them together can greatly improve throughput. Of course, the disadvantage of this method is: when a failure causes a problem in the release, it is unknown Which message has a problem, we must keep the entire batch in memory to record important information and then republish the message. Of course, this scheme is still synchronous, and also blocks the publication of messages.

 //Batch release confirmation
    public static void publishMessageBatch() throws Exception{<!-- -->

        Channel channel = RabbitMqUtils. getChannel();
        String queueName = UUID. randomUUID(). toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //Enable release confirmation
        channel. confirmSelect();
        //Starting time
        long begin = System. currentTimeMillis();
        // batch confirmation message size
        int batchSize = 100;
        for (int i = 0; i < MESSAGE_COUNT; i ++ ) {<!-- -->
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            // When judging that there are 100 messages, confirm in batches once
            if(i?tchSize ==0){<!-- -->
                //Publish confirmation
                channel.waitForConfirms();
            }

        }
        long end = System. currentTimeMillis();
        System.out.println("Release" + MESSAGE_COUNT + "batch confirmation messages, time-consuming" + (end - begin) + "ms");


    }

Asynchronous confirmation release

Although the programming logic of asynchronous confirmation is more complicated than the previous two, it is the most cost-effective, regardless of reliability or efficiency. It uses the callback function to achieve reliable delivery of messages. This middleware also uses function callbacks to ensure whether The delivery is successful, let us explain in detail how the asynchronous confirmation is realized.

 //Asynchronous release confirmation
    public static void publishMessageAsync() throws Exception{<!-- -->

        Channel channel = RabbitMqUtils. getChannel();
        String queueName = UUID. randomUUID(). toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //Enable release confirmation
        channel. confirmSelect();
        //Message confirmation success callback function
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {<!-- -->
            System.out.println("Confirmed message:" + deliveryTag);
        };
        //Message confirmation failed callback function
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {<!-- -->
            System.out.println("Unconfirmed message:" + deliveryTag);
        };

        //Prepare the listener for the message, listen to which messages succeeded and which messages failed (asynchronous notification)
        channel.addConfirmListener(ackCallback,nackCallback);
        //Starting time
        long begin = System. currentTimeMillis();


        for (int i = 0; i < MESSAGE_COUNT; i ++ ) {<!-- -->
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
        }

        long end = System. currentTimeMillis();
        System.out.println("Release" + MESSAGE_COUNT + "batch confirmation messages, time-consuming" + (end - begin) + "ms");


    }

How to handle asynchronous unacknowledged messages

The best solution is to put unconfirmed messages in a memory-based queue that can be accessed by the publishing thread, for example, use ConcurrentLinkedQueue to transfer messages between confirm callbacks and the publishing thread.

 //Asynchronous release confirmation
    public static void publishMessageAsync() throws Exception{<!-- -->

        Channel channel = RabbitMqUtils. getChannel();
        String queueName = UUID. randomUUID(). toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //Enable release confirmation
        channel. confirmSelect();
        /**
         * A thread-safe and ordered hash table, suitable for high concurrency
         * 1. Easily associate serial numbers with messages
         * 2. Easily delete entries in batches as long as the serial number is given
         * 3. Support concurrent access
         */
        ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
        //Message confirmation success callback function
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {<!-- -->
            //2. Delete the confirmed messages, and the rest are unconfirmed messages
            //if batch
            if (multiple) {<!-- -->
                // Return unacknowledged messages less than or equal to the current sequence number is a map
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms. headMap(deliveryTag);
                //Clear this part of unacknowledged messages
                confirmed. clear();
            }else{<!-- -->
                // Only clear the message of the current serial number
                outstandingConfirms. remove(deliveryTag);
            }

            System.out.println("Confirmed message:" + deliveryTag);
        };
        //Message confirmation failed callback function
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {<!-- -->
            //3. Print unconfirmed message
            System.out.println("Unconfirmed message:" + deliveryTag);
        };

        //Prepare the listener for the message, listen to which messages succeeded and which messages failed (asynchronous notification)
        channel.addConfirmListener(ackCallback,nackCallback);
        //Starting time
        long begin = System. currentTimeMillis();


        for (int i = 0; i < MESSAGE_COUNT; i ++ ) {<!-- -->
            String message = i + "";
            //1. Record all messages to be sent here
            /**
             * channel.getNextPublishSeqNo() Get the sequence number of the next message
             * Make an association with the message body through the serial number
             * All are unacknowledged message bodies
             */
            outstandingConfirms. put(channel. getNextPublishSeqNo(), message);
            channel.basicPublish("", queueName, null, message.getBytes());
        }

        long end = System. currentTimeMillis();
        System.out.println("Release" + MESSAGE_COUNT + "batch confirmation messages, time-consuming" + (end - begin) + "ms");


    }

Comparison of the above three release confirmation speeds

  • Post a message individually
    Waiting for acknowledgments synchronously is simple, but has very limited throughput.
  • Publish messages in batches
    Batch synchronization waiting for confirmation, simple, reasonable throughput, once there is a problem, it is difficult to deduce which line
    There was a problem with the message.
  • Asynchronous processing:
    Optimal performance and resource usage, with good control in case of errors, but slightly harder to implement