RabbitMQ Release Confirmation Advanced
- 1. Release and confirm SpringBoot version
-
- 1.1 Confirmation mechanism plan
- 1.2 Code architecture diagram
- 1.3 Configuration file
- 1.4 Add configuration class
- 1.5 Message producer
- 1.6 Callback interface
- 1.7 Message Consumer
- 1.8 Result analysis
- 2. Rollback message
-
- 2.1 Mandatory parameters
- 2.2 Message producer code
- 2.3 Callback interface
- 2.4 Result analysis
- 3. Backup switch
-
- 3.1 Code architecture diagram
- 3.2 Modify configuration class
- 3.3 Alarm consumers
- 3.4 Testing precautions
- 3.5 Result analysis
Due to some unknown reasons in the production environment, rabbitmq restarts. During the restart of RabbitMQ, the producer message delivery fails, resulting in message loss, which requires manual processing and recovery. So, we started thinking, how can we achieve reliable message delivery in RabbitMQ? Especially in such extreme situations, when the RabbitMQ cluster is unavailable, how to deal with undeliverable messages:
Application [xxx] occurred [Error Log Exception] at [08-1516:36:04], alertId=[xxx]. Depend on
[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620] triggers.
The possible reasons for applying xxx are as follows:
The service name is:
The exception is: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620,
The reasons are as follows: 1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException:
Cannot prepare queue for listener. Either the queue doesn’t exist or the broker will not
allow us to use it.||Consumer received fatal=false exception on startup:
1. Release and confirm SpringBoot version
1.1 Confirmation mechanism plan
1.2 Code Architecture Diagram
1.3 Configuration File
Need to be added to the configuration file
spring.rabbitmq.publisher-confirm-type=correlated
- NONE
Disable publishing confirmation mode, which is the default value - CORRELATED
After the message is successfully published to the switch, the callback method will be triggered. - SIMPLE
There are two effects after testing. The first effect will trigger the callback method like the CORRELATED value. The second effect is to userabbitTemplate
to callwaitForConfirms
Or thewaitForConfirmsOrDie
method waits for thebroker
node to return the sending result, and determines the next logic based on the return result. The point to note is that thewatiForConfirmsOrDie
method if Returningfalse
will close thechannel
, and then the message cannot be sent to thebroker
.
spring.rabbitmq.host=192.168.10.128 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123 spring.rabbitmq.publisher-confirm-type=correlated
1.4 Add configuration class
@Configuration public class ConfirmConfig {<!-- --> public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //Declare business Exchange @Bean("confirmExchange") public DirectExchange confirmExchange() {<!-- --> return new DirectExchange(CONFIRM_EXCHANGE_NAME); } //Declaration confirmation queue @Bean("confirmQueue") public Queue confirmQueue() {<!-- --> return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } // Statement confirms queue binding relationship @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {<!-- --> return BindingBuilder.bind(queue).to(exchange).with("key1"); } }
1.5 Message Producer
@RestController @RequestMapping("/confirm") @Slf4j public class Producer {<!-- --> public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; // Dependency inject rabbitTemplate and then set its callback object @PostConstruct public void init() {<!-- --> rabbitTemplate.setConfirmCallback(myCallBack); } @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message) {<!-- --> //Specify message id as 1 CorrelationData correlationData1 = new CorrelationData("1"); String routingKey = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1); CorrelationData correlationData2 = new CorrelationData("2"); routingKey = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2); log.info("Send message content:{}", message); } }
1.6 Callback interface
@Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback {<!-- --> /** * A callback method whether the switch receives the message or not *CorrelationData * Message related data *ack * Whether the switch receives the message */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- --> String id = correlationData != null ? correlationData.getId() : ""; if(ack) {<!-- --> log.info("The switch has received the message with id:{}", id); } else {<!-- --> log.info("The switch has not received the message with id:{}, due to reason:{}", id, cause); } } }
1.7 Message Consumer
@Component @Slf4j public class ConfirmConsumer {<!-- --> public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@ RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMsg(Message message) {<!-- --> String msg = new String(message.getBody()); log.info("Received queue confirm.queue message:{}", msg); } }
1.8 Result Analysis
You can see that two messages were sent. The RoutingKey
of the first message is “key1
“, and the RoutingKey
of the second message is For “key2
“, both messages were successfully accepted by the switch, and a confirmation callback from the switch was also received, but the consumer only received one message because the second message’s RoutingKey
is inconsistent with the queue’s BindingKey
, and no other queue can accept this message, so the second message is discarded directly.
The switch issued a confirmation callback, but the queue did not actually receive the message.
2. Rollback message
2.1 Mandatory parameters
When only the producer confirmation mechanism is enabled, after receiving the message, the switch will directly send a confirmation message to the message producer. If the message is found to be unroutable, the message will be discarded directly. At this time, the The author is unaware of the event that the message was discarded. So how can I help me find a way to handle messages that cannot be routed? At least let me know so I can handle it myself. By setting the mandatory
parameter, the message can be returned to the producer when the destination is unreachable during message delivery.
2.2 Message producer code
@Slf4j @Component public class MessageProducer implements RabbitTemplate.ConfirmCallBack,RabbitTemplate.ReturnCallback {<!-- --> @Autowired private RabbitTemplate rabbitTemplate; //Set this value after rabbitTemplate is injected @PostConstruct private void init() {<!-- --> rabbitTemplate.setConfirmCallback(this); /** *true: * When the switch cannot route the message, it will return the message to the producer * false: * If it is found that the message cannot be routed, it will be discarded directly. */ rabbitTemplate.setMandatory(true); //Set who should handle the rollback message rabbitTemplate.setReturnCallback(this); } @GetMapping("sendMessage") public void sendMessage(String message) {<!-- --> //Let the message be bound to an id value CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("confirm.exchange", "key1", message + "key1", correlationData1); log.info("The message id sent is: {} and the content is {}", correlationData1.getId(), message + "key1"); CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("confirm.exchange", "key2", message + "key2", correlationData2); log.info("The message id sent is: {} and the content is {}", correlationData2.getId(), message + "key2"); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- --> String id = correlationData != null ? correlationData.getId() : ""; if(ack) {<!-- --> log.info("The switch received the message and confirmed it successfully, id:{}", id); } else {<!-- --> log.error("Message id:{} was not successfully delivered to the switch because: {}", id, cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- --> log.info("Message: {} was returned by the server, reason for return: {}, switch is: {}, routing key: {}", new String(message.getBody()), replyText, exchange, routingKey) ; } }
2.3 Callback interface
@Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {<!-- --> /** * A callback method whether the switch receives the message or not *CorrelationData * Message related data *ack * Whether the switch receives the message */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- --> String id = correlationData != null ? correlationData.getId() : ""; if(ack) {<!-- --> log.info("The switch has received the message with id:{}", id); } else {<!-- --> log.info("The switch has not received the message with id:{}, due to reason:{}", id, cause); } } //Callback method when the message cannot be routed @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- --> log.error("Message {}, returned by exchange {}, return reason:{}, routing key:{}", new String(message.getBody()), exchange, replyText, routingKey); } }
2.4 Result Analysis
3. Backup switch
With the mandatory
parameter and the fallback message, we gain the ability to sense undeliverable messages and have the opportunity to discover and handle when the producer’s message cannot be delivered. But sometimes, we don’t know how to handle these unroutable messages. At most, we can log them, trigger an alarm, and then handle them manually. It is very inelegant to handle these unroutable messages through logs, especially when the service where the producer is located has multiple machines. Manually copying logs will be more troublesome and error-prone. Moreover, setting the mandatory
parameter will increase the complexity of the producer, and it is necessary to add logic to handle these returned messages. What should you do if you don’t want to lose messages but don’t want to increase the complexity of the producer?
In the previous article about setting up a private message queue, we mentioned that a private message switch can be set up for the queue to store messages that have failed to be processed. However, these non-routable messages have no chance to enter the queue, so the private message queue cannot be used to save messages. . In RabbitMQ
, there is a mechanism called Backup Switch, which can deal with this problem very well. What is a backup switch? The backup switch can be understood as the “spare tire” of the switch in RabbitMQ
. When we declare a corresponding backup switch for a certain switch, we create a spare tire for it. When the switch receives an unroutable When a message is received, the message will be forwarded to the backup switch, which will forward and process it. Usually the type of backup switch is Fanout
, so that all messages can be delivered to its In the bound queue, we then bind a queue under the backup switch, so that all messages that cannot be routed by the original switch will enter this queue. Of course, we can also create an alarm queue and use independent consumers to detect and alarm.
3.1 Code Architecture Diagram
3.2 Modify configuration class
@Configuration public class ConfirmConfig {<!-- --> public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; //Declaration confirmation queue @Bean("confirmQueue") public Queue confirmQueue() {<!-- --> return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //Statement confirms queue binding relationship @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {<!-- --> return BindingBuilder.bind(queue).to(exchange).with("key1"); } //Declare backup Exchange @Bean("backupExchange") public FanoutExchange backupExchange() {<!-- --> return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //Statement confirms the backup switch of the Exchange switch @Bean("confirmExchange") public DirectExchange confirmExchange() {<!-- --> ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) //Set the backup switch of this switch .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); return(DirectExchange) exchangeBuilder.build(); } //Declare warning queue @Bean("warningQueue") public Queue warningQueue() {<!-- --> return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } // Declare the alarm queue binding relationship @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {<!-- --> return BindingBuilder.bind(queue).to(backupExchange); } // Declare backup queue @Bean("backQueue") public Queue backQueue() {<!-- --> return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } //Declare the backup queue binding relationship @Bean public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {<!-- --> return BindingBuilder.bind(queue).to(backupExchange); } }
3.3 Alarm consumers
@Component @Slf4j public class WarningConsumer {<!-- --> public static final String WARNING_QUEUE_NAME = "warning.queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message) {<!-- --> String msg = new String(message.getBody()); log.error("Alarm found unroutable message: {}", msg); } }
3.4 Testing Notes
When restarting the project, you need to delete the original cofirm.exchange
because we have modified its binding properties, otherwise an error will be reported:
3.5 Result Analysis
When the mandatory
parameter and the backup switch can be used together, if both are turned on at the same time, where will the message go? Who has a higher priority? The above results show that the answer is The backup switch has a higher priority.