Business card:
Blogger: Alcoholic?.
Personal profile: Indulge in wine, and use the energy of wine to fight for a future.
This article is inspirational: When three people are together, there must be one who is my teacher.
This project is based on Bilibili’sDark Horse Programmer Java’s “SpringCloud Microservice Technology Stack”, SpringCloud + RabbitMQ + Docker + Redis + Search + Distribution
[SpringCloud + RabbitMQ + Docker + Redis + Search + Distributed, System Detailed Springcloud Microservice Technology Stack Course | Dark Horse Programmer Java Microservices] Click to watch
Directory
- 2. Message Reliability
-
- 1. Producer message confirmation
2. Message reliability
From sending a message to being received by the consumer, multiple processes are handled:
Each of these steps may result in message loss. Common reasons for loss include:
- Lost while sending:
- The message sent by the producer is not delivered to the exchange
- The message does not reach the queue after reaching the exchange
- MQ is down and the queue loses messages
- After receiving the message, the consumer crashes without consuming it.
In response to these problems, RabbitMQ provides solutions:
- Producer confirmation mechanism
- mq persistence
- Consumer confirmation mechanism
- Failure retry mechanism
First, import the project project (mq-advanced-demo) provided by the pre-course materials
1. Producer message confirmation
RabbitMQ provides a publisher confirm mechanism to avoid message loss during sending to MQ. This mechanism must assign a unique ID to each message. After the message is sent to MQ, a result will be returned to the sender, indicating whether the message was successfully processed.
There are two ways to return results:
- publisher-confirm, sender confirmation
- The message is successfully delivered to the switch and ack is returned.
- The message is not delivered to the switch and nack is returned.
- publisher-return, sender receipt
- The message is delivered to the switch, but is not routed to the queue. Returns ACK and the reason for routing failure.
Notice:
- Change setting
First, modify the IP address, user name, password and other configurations of rabbitmq in the application.yml file in the publisher service, and add the following content:
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
illustrate:
publish-confirm-type
: Enable publisher-confirm. Two types are supported:
simple
: Wait for the confirm result synchronously until timeoutcorrelated
: Asynchronous callback, define ConfirmCallback, MQ will call back this ConfirmCallback when returning the result.publish-returns
: Turn on the publish-return function, which is also based on the callback mechanism, but defines ReturnCallbacktemplate.mandatory
: Defines the strategy when message routing fails. true, call ReturnCallback; false: discard the message directly
- Define Return callback
Each RabbitTemplate can only be configured with one ReturnCallback
, so it needs to be configured when the project is loaded:
Modify the publisher service and add one:
package cn.itcast.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware {<!-- --> @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {<!-- --> //Get rabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //Set ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {<!-- --> log.error("Message sending failed, response code: {}, reason: {}, switch: {}, routing key: {}, message: {}", replyCode, replyText, exchange, routingKey, message); }); } }
- DefineConfirmCallback
ConfirmCallback can be specified when sending a message, because the logic of each business processing confirm success or failure is not necessarily the same.
In the cn.itcast.mq.spring.SpringAmqpTest class of the publisher service, define a unit test method:
package cn.itcast.mq.spring; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; @Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest {<!-- --> @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() throws InterruptedException {<!-- --> //Message body String message = "hello, spring amqp!"; //Set unique ID. Encapsulated into CorrelationData CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //Add callback correlationData.getFuture().addCallback( result -> {<!-- --> if (result.isAck()){<!-- --> //ack, sent successfully log.debug("Message sent to switch successfully, ID: {}", correlationData.getId()); }else {<!-- --> //nock, sending failed log.error("Failed to send message to switch, ID:{}", correlationData.getId()); } }, ex -> log.error("Message sending queue failed, ID: {}, reason: {}", correlationData.getId(), ex.getMessage()) ); //Send a message rabbitTemplate.convertAndSend("amq.direct", "amq", message, correlationData); } }
test:
Add the exchange and queue corresponding to the code on the mq browser side, and set the contact routingKey.
exchange
queue
routingKey
result: