Overview
There are two types of message confirmation in RabbitMQ. One is message sending confirmation. This is used to confirm whether the message is successfully delivered when the producer sends the message to the exchanger and the exchanger passes it to the queue. Sending confirmation is divided into two steps, one is to confirm whether it reaches the switch, and the other is to confirm whether it reaches the queue. The second is consumption receipt confirmation. This is to confirm whether the consumer has successfully consumed the message in the queue.
Details
1. Operational effect
2. Implementation process
①. Introduce rabbitmq package
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
②. Modify application.properties configuration
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # The sender turns on the confirm confirmation mechanism spring.rabbitmq.publisher-confirms=true # The sender turns on the return confirmation mechanism spring.rabbitmq.publisher-returns=true ################################################ ## #Set manual ack on the consumer side spring.rabbitmq.listener.simple.acknowledge-mode=manual # Whether to support retry spring.rabbitmq.listener.simple.retry.enabled=true
③. Define exchange and queue, and bind queue to exchange
package com.mm.springbootrabbitmqconfirmdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean(name = "confirmQueue") public Queue confirmQueue(){ return new Queue("confirmQueue",true,false,false); } @Bean(name = "confirmExchange") public FanoutExchange confirmExchange(){ return new FanoutExchange("confirmExchange"); } @Bean public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange); } }
④. Message sending confirmation
Send message confirmation: Used to confirm that the producer producer
sends the message to broker
, the switch exchange
on broker
and then In the process of delivering to the queue queue
, whether the message is successfully delivered.
Messages from producer
to rabbitmq broker
have a confirmCallback
confirmation mode.
Message delivery failure from exchange
to queue
has a returnCallback
fallback mode.
We can use these two Callback
to ensure 100% delivery of the message.
1. ConfirmCallback confirmation mode
As long as the message is received by rabbitmq broker
, the confirmCallback
callback will be triggered.
package com.mm.springbootrabbitmqconfirmdemo.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (!ack) { log.error("Message sending exception!"); } else { log.info("The sender's father has received the confirmation, correlationData={}, ack={}, cause={}", correlationData.getId(), ack, cause); } } }
Implement the interface ConfirmCallback
and rewrite its confirm()
method. There are three parameters in the method: correlationData
, ack
, cause
.
-
correlationData
: There is only oneid
attribute inside the object, which is used to indicate the uniqueness of the current message. -
ack
: The status of message delivery tobroker
,true
indicates success. -
cause
: Indicates the reason for delivery failure.
However, the message being received by broker
can only mean that it has arrived at the MQ server, and there is no guarantee that the message will be delivered to the target queue
. So you need to use returnCallback
next.
2. ReturnCallback return mode
If the message fails to be delivered to the target queue
, the callback returnCallback
will be triggered. Once the message is not successfully delivered to the queue
, the current message will generally be recorded here. Detailed delivery data to facilitate subsequent operations such as resending or compensation.
com.mm.springbootrabbitmqconfirmdemo.service; lombok.extern.slf4j.; org.springframework.amqp.core.Message; org.springframework.amqp.rabbit.core.RabbitTemplate; org.springframework.stereotype.; ReturnCallbackService RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;
Implement the interface ReturnCallback
and override the returnedMessage()
method. The method has five parameters: message
(message body), replyCode
(response code), replyText
(response content), exchange
(switch), routingKey
(queue).
The following is the specific message sending. Set the Confirm
and Return
callbacks in rabbitTemplate
. We use setDeliveryMode()
to The message is persisted, and a CorrelationData
object is created for subsequent testing, and an id
is added as 10000000000
.
⑤. Message sending confirmation
Message reception confirmation is simpler than message sending confirmation, because there is only one message receipt (ack
) process. The method marked with the @RabbitHandler
annotation needs to add two parameters: channel
(channel) and message
.
@Slf4j @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("Xiaofu received the message: {}", msg); //TODO specific business channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("The message has been processed repeatedly and failed, and it is refused to be received again..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // Reject the message } else { log.error("The message will be returned to the queue for processing again..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
There are three receipt methods for consuming messages. Let’s analyze the meaning of each method.
1.basicAck
basicAck
: Indicates successful confirmation. After using this receipt method, the message will be deleted by rabbitmq broker
.
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
: Indicates the message delivery sequence number. Each time a message is consumed or the message is re-delivered, deliveryTag
will increase. In manual message confirmation mode, we can perform operations such as ack
, nack
, and reject
on the message with specified deliveryTag
.
multiple
: Whether to confirm in batches. If the value is true
, all messages smaller than the current message deliveryTag
will be ack
at once.
For example: Suppose I first send three messages deliveryTag
, which are 5, 6, and 7, but they are not confirmed. When I send the fourth message, deliveryTag
is 8, and multiple
is set to true, all messages 5, 6, 7, and 8 will be confirmed.
2.basicNack
basicNack
: Indicates failure confirmation. This method is generally used when the message consumption business is abnormal, and the message can be re-delivered into the queue.
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag
: Indicates the message delivery sequence number.
multiple
: Whether to confirm in batches.
requeue
: A value of true
causes the message to be requeued.
3.basicReject
basicReject
: Reject the message. The difference from basicNack
is that batch operations cannot be performed. Other usages are very similar.
void basicReject(long deliveryTag, boolean requeue)
deliveryTag
: Indicates the message delivery sequence number.
requeue
: A value of true
causes the message to be requeued.
3. Project structure chart
4. Supplement
1. Don’t forget to confirm the message
This is a very untechnical pit, but it is a very easy place to make mistakes.
Enable the message acknowledgment mechanism, and don’t forget to channel.basicAck
when consuming messages, otherwise the messages will always exist, resulting in repeated consumption.
2. Unlimited message delivery
When I first came into contact with the message confirmation mechanism, the consumer code was written as follows. The idea was very simple: after processing the business logic, confirm the message. int a = 1 / 0
After an exception occurs, The message is put back into the queue.
@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("Consumer 2 received: {}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }
3. Repeat consumption
How to ensure that MQ consumption is idempotent depends on the specific business. You can use MySQL
or redis
to persist the message and ensure the uniqueness in the message. Attribute verification.
It can be seen that after using RabbitMQ
, our business links have obviously become longer. Although the decoupling between systems has been achieved, the scenarios that may cause message loss have also increased. For example:
-
Message producer -> rabbitmq server (message sending failed)
-
Rabbitmq server’s own failure causes message loss
-
Message consumer -> rabbitmq service (failed to consume messages)