RabbitMQ release confirmation advanced

RabbitMQ Release Confirmation Advanced

  • 1. Release and confirm SpringBoot version
    • 1.1 Confirmation mechanism plan
    • 1.2 Code architecture diagram
    • 1.3 Configuration file
    • 1.4 Add configuration class
    • 1.5 Message producer
    • 1.6 Callback interface
    • 1.7 Message Consumer
    • 1.8 Result analysis
  • 2. Rollback message
    • 2.1 Mandatory parameters
    • 2.2 Message producer code
    • 2.3 Callback interface
    • 2.4 Result analysis
  • 3. Backup switch
    • 3.1 Code architecture diagram
    • 3.2 Modify configuration class
    • 3.3 Alarm consumers
    • 3.4 Testing precautions
    • 3.5 Result analysis

Due to some unknown reasons in the production environment, rabbitmq restarts. During the restart of RabbitMQ, the producer message delivery fails, resulting in message loss, which requires manual processing and recovery. So, we started thinking, how can we achieve reliable message delivery in RabbitMQ? Especially in such extreme situations, when the RabbitMQ cluster is unavailable, how to deal with undeliverable messages:

Application [xxx] occurred [Error Log Exception] at [08-1516:36:04], alertId=[xxx]. Depend on
[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620] triggers.
The possible reasons for applying xxx are as follows:
The service name is:
The exception is: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620,
The reasons are as follows: 1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException:
Cannot prepare queue for listener. Either the queue doesn’t exist or the broker will not
allow us to use it.||Consumer received fatal=false exception on startup:

1. Release and confirm SpringBoot version

1.1 Confirmation mechanism plan

1.2 Code Architecture Diagram

1.3 Configuration File

Need to be added to the configuration file

spring.rabbitmq.publisher-confirm-type=correlated
  • NONE
    Disable publishing confirmation mode, which is the default value
  • CORRELATED
    After the message is successfully published to the switch, the callback method will be triggered.
  • SIMPLE
    There are two effects after testing. The first effect will trigger the callback method like the CORRELATED value. The second effect is to use rabbitTemplate to call waitForConfirms Or the waitForConfirmsOrDie method waits for the broker node to return the sending result, and determines the next logic based on the return result. The point to note is that the watiForConfirmsOrDie method if Returning false will close the channel, and then the message cannot be sent to the broker.
spring.rabbitmq.host=192.168.10.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

1.4 Add configuration class

@Configuration
public class ConfirmConfig {<!-- -->
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//Declare business Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange() {<!-- -->
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//Declaration confirmation queue
@Bean("confirmQueue")
public Queue confirmQueue() {<!-- -->
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// Statement confirms queue binding relationship
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {<!-- -->
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}

1.5 Message Producer

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {<!-- -->
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
// Dependency inject rabbitTemplate and then set its callback object
@PostConstruct
public void init() {<!-- -->
rabbitTemplate.setConfirmCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {<!-- -->
//Specify message id as 1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
log.info("Send message content:{}", message);
}
}

1.6 Callback interface

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {<!-- -->
/**
* A callback method whether the switch receives the message or not
*CorrelationData
* Message related data
*ack
* Whether the switch receives the message
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- -->
String id = correlationData != null ? correlationData.getId() : "";
if(ack) {<!-- -->
log.info("The switch has received the message with id:{}", id);
} else {<!-- -->
log.info("The switch has not received the message with id:{}, due to reason:{}", id, cause);
}
}
}

1.7 Message Consumer

@Component
@Slf4j
public class ConfirmConsumer {<!-- -->
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@
RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message) {<!-- -->
String msg = new
String(message.getBody());
log.info("Received queue confirm.queue message:{}", msg);
}
}

1.8 Result Analysis

You can see that two messages were sent. The RoutingKey of the first message is “key1“, and the RoutingKey of the second message is For “key2“, both messages were successfully accepted by the switch, and a confirmation callback from the switch was also received, but the consumer only received one message because the second message’s RoutingKey is inconsistent with the queue’s BindingKey, and no other queue can accept this message, so the second message is discarded directly.

The switch issued a confirmation callback, but the queue did not actually receive the message.

2. Rollback message

2.1 Mandatory parameters

When only the producer confirmation mechanism is enabled, after receiving the message, the switch will directly send a confirmation message to the message producer. If the message is found to be unroutable, the message will be discarded directly. At this time, the The author is unaware of the event that the message was discarded. So how can I help me find a way to handle messages that cannot be routed? At least let me know so I can handle it myself. By setting the mandatory parameter, the message can be returned to the producer when the destination is unreachable during message delivery.

2.2 Message producer code

@Slf4j
@Component
public class MessageProducer implements RabbitTemplate.ConfirmCallBack,RabbitTemplate.ReturnCallback {<!-- -->
    @Autowired
private RabbitTemplate rabbitTemplate;
//Set this value after rabbitTemplate is injected
@PostConstruct
private void init() {<!-- -->
rabbitTemplate.setConfirmCallback(this);
/**
*true:
* When the switch cannot route the message, it will return the message to the producer
* false:
* If it is found that the message cannot be routed, it will be discarded directly.
*/
rabbitTemplate.setMandatory(true);
//Set who should handle the rollback message
rabbitTemplate.setReturnCallback(this);
}
    @GetMapping("sendMessage")
public void sendMessage(String message) {<!-- -->
//Let the message be bound to an id value
CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange", "key1", message + "key1", correlationData1);
log.info("The message id sent is: {} and the content is {}", correlationData1.getId(), message + "key1");
CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange", "key2", message + "key2", correlationData2);
log.info("The message id sent is: {} and the content is {}", correlationData2.getId(), message + "key2");
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- -->
String id = correlationData != null ? correlationData.getId() : "";
if(ack) {<!-- -->
log.info("The switch received the message and confirmed it successfully, id:{}", id);
} else {<!-- -->
log.error("Message id:{} was not successfully delivered to the switch because: {}", id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- -->
log.info("Message: {} was returned by the server, reason for return: {}, switch is: {}, routing key: {}", new String(message.getBody()), replyText, exchange, routingKey) ;
}
}

2.3 Callback interface

@Component
@Slf4j
public class MyCallBack implements
RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {<!-- -->
/**
* A callback method whether the switch receives the message or not
*CorrelationData
* Message related data
*ack
* Whether the switch receives the message
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- -->
String id = correlationData != null ? correlationData.getId() : "";
if(ack) {<!-- -->
log.info("The switch has received the message with id:{}", id);
} else {<!-- -->
log.info("The switch has not received the message with id:{}, due to reason:{}", id, cause);
}
}
//Callback method when the message cannot be routed
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- -->
log.error("Message {}, returned by exchange {}, return reason:{}, routing key:{}", new String(message.getBody()), exchange, replyText, routingKey);
}
}

2.4 Result Analysis

3. Backup switch

With the mandatory parameter and the fallback message, we gain the ability to sense undeliverable messages and have the opportunity to discover and handle when the producer’s message cannot be delivered. But sometimes, we don’t know how to handle these unroutable messages. At most, we can log them, trigger an alarm, and then handle them manually. It is very inelegant to handle these unroutable messages through logs, especially when the service where the producer is located has multiple machines. Manually copying logs will be more troublesome and error-prone. Moreover, setting the mandatory parameter will increase the complexity of the producer, and it is necessary to add logic to handle these returned messages. What should you do if you don’t want to lose messages but don’t want to increase the complexity of the producer?
In the previous article about setting up a private message queue, we mentioned that a private message switch can be set up for the queue to store messages that have failed to be processed. However, these non-routable messages have no chance to enter the queue, so the private message queue cannot be used to save messages. . In RabbitMQ, there is a mechanism called Backup Switch, which can deal with this problem very well. What is a backup switch? The backup switch can be understood as the “spare tire” of the switch in RabbitMQ. When we declare a corresponding backup switch for a certain switch, we create a spare tire for it. When the switch receives an unroutable When a message is received, the message will be forwarded to the backup switch, which will forward and process it. Usually the type of backup switch is Fanout, so that all messages can be delivered to its In the bound queue, we then bind a queue under the backup switch, so that all messages that cannot be routed by the original switch will enter this queue. Of course, we can also create an alarm queue and use independent consumers to detect and alarm.

3.1 Code Architecture Diagram

3.2 Modify configuration class

@Configuration
public class ConfirmConfig {<!-- -->
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
//Declaration confirmation queue
@Bean("confirmQueue")
public Queue confirmQueue() {<!-- -->
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//Statement confirms queue binding relationship
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {<!-- -->
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
//Declare backup Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange() {<!-- -->
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//Statement confirms the backup switch of the Exchange switch
@Bean("confirmExchange")
public DirectExchange confirmExchange() {<!-- -->
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
//Set the backup switch of this switch
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return(DirectExchange) exchangeBuilder.build();
}
//Declare warning queue
@Bean("warningQueue")
public Queue warningQueue() {<!-- -->
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// Declare the alarm queue binding relationship
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {<!-- -->
return BindingBuilder.bind(queue).to(backupExchange);
}
// Declare backup queue
@Bean("backQueue")
public Queue backQueue() {<!-- -->
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
//Declare the backup queue binding relationship
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {<!-- -->
return BindingBuilder.bind(queue).to(backupExchange);
}
}

3.3 Alarm consumers

@Component
@Slf4j
public class WarningConsumer {<!-- -->
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {<!-- -->
String msg = new String(message.getBody());
log.error("Alarm found unroutable message: {}", msg);
}
}

3.4 Testing Notes

When restarting the project, you need to delete the original cofirm.exchange because we have modified its binding properties, otherwise an error will be reported:

3.5 Result Analysis


When the mandatory parameter and the backup switch can be used together, if both are turned on at the same time, where will the message go? Who has a higher priority? The above results show that the answer is The backup switch has a higher priority.