springboot integrates RabbitMQ confirm confirmation mode and return fallback mode and Consumer Ack mode
Description: Reliable delivery of RabbitMQ messages
- When using RabbitMQ, as a message sender, you want to prevent any message loss or delivery failure scenarios.
- RabbitMQ provides two ways to control the reliability mode of message delivery: confirm confirmation mode, return fallback mode.
- Consumer Ack: Indicates the confirmation method of the consumer after receiving the message.
1. RabbitMQ confirm confirmation mode and return fallback mode
- application.yml The configuration file opens the confirm and return modes
spring: rabbitmq: host: 192.168.192.137 username: qmh password: qmh virtual-host: /qmh port: 5672 # publisher-confirms reliable delivery of messages, confirm confirmation mode defaults to false publisher-confirms: true # publisher-returns reliable delivery of messages, return fallback mode defaults to false publisher-returns: true
- Write RabbitMQConfig configuration class
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { public static final String DIRECT_EXCHANGE = "springboot_direct_exchange"; public static final String DIRECT_QUEUE_CONFIRM = "springboot_direct_queue_confirm"; // Test the reliable delivery of RabbitMQ messages // 1. Define the switch @Bean("DirectExchange") public Exchange directExchange(){ return ExchangeBuilder.topicExchange(DIRECT_EXCHANGE).durable(true).build(); } // 2. Define the queue @Bean("DirectQueueConfirm") public Queue directQueueConfirm(){ return QueueBuilder.durable(DIRECT_QUEUE_CONFIRM).build(); } // 3. Bind the switch and the queue @Bean public Binding directBindQueueConfirmAndExchange(@Qualifier("DirectQueueConfirm") Queue directQueueConfirm,@Qualifier("DirectExchange") Exchange directExchange){ return BindingBuilder.bind(directQueueConfirm).to(directExchange).with("confirm").noargs(); } }
- Write a test class, send a message
import com.itheima.rabbitmqConfig.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest @RunWith(SpringRunner. class) public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; /** * Reliable delivery of test messages * confirm confirmation mode * Call back the confirm method after the message is sent to the exchange */ @Test public void directConfirmTest(){ // define callback rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData related configuration information * @param ack exchange Whether the switch successfully received the information * @param cause failure reason */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm running ..."); if (ack){ System.out.println("Successfully received"); } else { System.out.println("Failed to receive:" + cause); } } }); // Send a message rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"confirm","Test RabbitMQ confirm success!"); } /** * Reliable delivery of test messages return fallback mode * When the message fails to be routed from the exchange to the queue, the message will be returned to the producer. And execute the callback function returnedMessage * Steps: * -- Enable fallback mode publisher-returns: true * -- Set the ReturnCallBack callback method * -- Set the exchange switch processing mode: * 1. If the message is not routed to the Queue, the message is discarded (default) * 2. If the message is not routed to the Queue, return the message to the sender ReturnCallBack. */ @Test public void directReturnTest(){ // Set the switch's mode of handling failure messages // rabbitTemplate. setMandatory(true); // Set the ReturnCallBack callback method rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * @param message message object * @param replyCode error code * @param replyText error object * @param exchange exchange name * @param routingKey routing key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){ System.out.println("return running..."); } }); // Send a message rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,"confirm11","Test RabbitMQ return success!"); } }
2. RabbitMQ Consumer Ack mode (the following case is the manual confirmation method)
Note: There are three confirmation methods for Consumer Ack
- Automatic confirmation: acknowledge=”none”, (used by default)
- Manual confirmation: acknowledge=”manual”, (recommended)
- Confirm according to the abnormal situation: acknowledge=”auto”
- The application.yml configuration file opens the manual confirmation mode
spring: rabbitmq: host: 192.168.192.137 username: qmh password: qmh virtual-host: /qmh port: 5672 # Enable manual confirmation mode listener: simple: acknowledgment-mode: manual
- Write a consumer listener, mainly implementing the ChannelAwareMessageListener class
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /** * Consumer Ack mechanism ack refers to Acknowledge, confirmation. Indicates the confirmation method of the consumer after receiving the message. * There are three confirmation methods: * -- Automatic confirmation: acknowledge="none", once the message is received by the Consumer, it will automatically confirm the receipt and remove the corresponding message from the RabbitMQ message cache * -- Manual confirmation: acknowledge="manual", you need to call channel.basicAck() after the business processing is successful, and sign for it manually. If there is an exception, call the channel.basicNack() method to let it automatically resend the message . * -- Acknowledgment by exception: acknowledge="auto", */ @Component public class ListenerConsumerAck implements ChannelAwareMessageListener { @Override @RabbitListener(queues = "springboot_direct_queue_confirm") public void onMessage(Message message, Channel channel) throws Exception { Thread. sleep(1000); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String(message.getBody())); System.out.println("Logical business processing completed..."); //int i = 3/0; //Error /** * Manually confirm the receipt basicAck(long deliveryTag, boolean multiple) * deliveryTag label * multiple Receive multiple messages */ channel.basicAck(deliveryTag,true); }catch (Exception e){ /** * Failed to sign for receipt basicNack(long deliveryTag, boolean multiple, boolean requeue) * deliveryTag label * multiple Receive multiple messages * requeue true, the message returns to the Queue, and the broker resends the message to the consumer */ channel.basicNack(deliveryTag,true,true); } } }