springboot integrates RabbitMQ confirm confirmation mode and return fallback mode and Consumer Ack mode

springboot integrates RabbitMQ confirm confirmation mode and return fallback mode and Consumer Ack mode

Description: Reliable delivery of RabbitMQ messages

  • When using RabbitMQ, as a message sender, you want to prevent any message loss or delivery failure scenarios.
  • RabbitMQ provides two ways to control the reliability mode of message delivery: confirm confirmation mode, return fallback mode.
  • Consumer Ack: Indicates the confirmation method of the consumer after receiving the message.

1. RabbitMQ confirm confirmation mode and return fallback mode

  • application.yml The configuration file opens the confirm and return modes
spring:
  rabbitmq:
    host: 192.168.192.137
    username: qmh
    password: qmh
    virtual-host: /qmh
    port: 5672
    # publisher-confirms reliable delivery of messages, confirm confirmation mode defaults to false
    publisher-confirms: true
    # publisher-returns reliable delivery of messages, return fallback mode defaults to false
    publisher-returns: true
  • Write RabbitMQConfig configuration class
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public static final String DIRECT_EXCHANGE = "springboot_direct_exchange";
    public static final String DIRECT_QUEUE_CONFIRM = "springboot_direct_queue_confirm"; // Test the reliable delivery of RabbitMQ messages
    // 1. Define the switch
    @Bean("DirectExchange")
    public Exchange directExchange(){
        return ExchangeBuilder.topicExchange(DIRECT_EXCHANGE).durable(true).build();
    }
    // 2. Define the queue
    @Bean("DirectQueueConfirm")
    public Queue directQueueConfirm(){
        return QueueBuilder.durable(DIRECT_QUEUE_CONFIRM).build();
    }
    // 3. Bind the switch and the queue
    @Bean
    public Binding directBindQueueConfirmAndExchange(@Qualifier("DirectQueueConfirm") Queue directQueueConfirm,@Qualifier("DirectExchange") Exchange directExchange){
        return BindingBuilder.bind(directQueueConfirm).to(directExchange).with("confirm").noargs();
    }
}

  • Write a test class, send a message
import com.itheima.rabbitmqConfig.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner. class)
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * Reliable delivery of test messages
     * confirm confirmation mode
     * Call back the confirm method after the message is sent to the exchange
     */
    @Test
    public void directConfirmTest(){
        // define callback
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData related configuration information
             * @param ack exchange Whether the switch successfully received the information
             * @param cause failure reason
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm running ...");
                if (ack){
                    System.out.println("Successfully received");
                } else {
                    System.out.println("Failed to receive:" + cause);
                }
            }
        });
        // Send a message
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"confirm","Test RabbitMQ confirm success!");
    }

    /**
     * Reliable delivery of test messages return fallback mode
     * When the message fails to be routed from the exchange to the queue, the message will be returned to the producer. And execute the callback function returnedMessage
     * Steps:
     * -- Enable fallback mode publisher-returns: true
     * -- Set the ReturnCallBack callback method
     * -- Set the exchange switch processing mode:
     * 1. If the message is not routed to the Queue, the message is discarded (default)
     * 2. If the message is not routed to the Queue, return the message to the sender ReturnCallBack.
     */
    @Test
    public void directReturnTest(){
        // Set the switch's mode of handling failure messages
        // rabbitTemplate. setMandatory(true);
        // Set the ReturnCallBack callback method
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * @param message message object
             * @param replyCode error code
             * @param replyText error object
             * @param exchange exchange name
             * @param routingKey routing key
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){
                System.out.println("return running...");
            }
        });
        // Send a message
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"confirm11","Test RabbitMQ return success!");
    }
}

2. RabbitMQ Consumer Ack mode (the following case is the manual confirmation method)

Note: There are three confirmation methods for Consumer Ack

  • Automatic confirmation: acknowledge=”none”, (used by default)
  • Manual confirmation: acknowledge=”manual”, (recommended)
  • Confirm according to the abnormal situation: acknowledge=”auto”
  • The application.yml configuration file opens the manual confirmation mode
spring:
  rabbitmq:
    host: 192.168.192.137
    username: qmh
    password: qmh
    virtual-host: /qmh
    port: 5672
    # Enable manual confirmation mode
    listener:
      simple:
        acknowledgment-mode: manual
  • Write a consumer listener, mainly implementing the ChannelAwareMessageListener class
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * Consumer Ack mechanism ack refers to Acknowledge, confirmation. Indicates the confirmation method of the consumer after receiving the message.
 * There are three confirmation methods:
 * -- Automatic confirmation: acknowledge="none", once the message is received by the Consumer, it will automatically confirm the receipt and remove the corresponding message from the RabbitMQ message cache
 * -- Manual confirmation: acknowledge="manual", you need to call channel.basicAck() after the business processing is successful, and sign for it manually. If there is an exception, call the channel.basicNack() method to let it automatically resend the message .
 * -- Acknowledgment by exception: acknowledge="auto",
 */
@Component
public class ListenerConsumerAck implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "springboot_direct_queue_confirm")
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread. sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("Logical business processing completed...");
            //int i = 3/0; //Error
            /**
             * Manually confirm the receipt basicAck(long deliveryTag, boolean multiple)
             * deliveryTag label
             * multiple Receive multiple messages
             */
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            /**
             * Failed to sign for receipt basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * deliveryTag label
             * multiple Receive multiple messages
             * requeue true, the message returns to the Queue, and the broker resends the message to the consumer
             */
            channel.basicNack(deliveryTag,true,true);
        }
    }
}