MQ Advanced-Message Confirmation

Please add an image description
Business card:

Blogger: Alcoholic?.
Personal profile: Indulge in wine, and use the energy of wine to fight for a future.
This article is inspirational: When three people are together, there must be one who is my teacher.

Please add an image description
This project is based on Bilibili’sDark Horse Programmer Java’s “SpringCloud Microservice Technology Stack”, SpringCloud + RabbitMQ + Docker + Redis + Search + Distribution

[SpringCloud + RabbitMQ + Docker + Redis + Search + Distributed, System Detailed Springcloud Microservice Technology Stack Course | Dark Horse Programmer Java Microservices] Click to watch

Directory

  • 2. Message Reliability
    • 1. Producer message confirmation

2. Message reliability

From sending a message to being received by the consumer, multiple processes are handled:

Each of these steps may result in message loss. Common reasons for loss include:

  • Lost while sending:
    • The message sent by the producer is not delivered to the exchange
    • The message does not reach the queue after reaching the exchange
  • MQ is down and the queue loses messages
  • After receiving the message, the consumer crashes without consuming it.

In response to these problems, RabbitMQ provides solutions:

  • Producer confirmation mechanism
  • mq persistence
  • Consumer confirmation mechanism
  • Failure retry mechanism

First, import the project project (mq-advanced-demo) provided by the pre-course materials

1. Producer message confirmation

RabbitMQ provides a publisher confirm mechanism to avoid message loss during sending to MQ. This mechanism must assign a unique ID to each message. After the message is sent to MQ, a result will be returned to the sender, indicating whether the message was successfully processed.

There are two ways to return results:

  • publisher-confirm, sender confirmation
    • The message is successfully delivered to the switch and ack is returned.
    • The message is not delivered to the switch and nack is returned.
  • publisher-return, sender receipt
    • The message is delivered to the switch, but is not routed to the queue. Returns ACK and the reason for routing failure.

Notice:

  1. Change setting

First, modify the IP address, user name, password and other configurations of rabbitmq in the application.yml file in the publisher service, and add the following content:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

illustrate:

  • publish-confirm-type: Enable publisher-confirm. Two types are supported:
    • simple: Wait for the confirm result synchronously until timeout
    • correlated: Asynchronous callback, define ConfirmCallback, MQ will call back this ConfirmCallback when returning the result.
  • publish-returns: Turn on the publish-return function, which is also based on the callback mechanism, but defines ReturnCallback
  • template.mandatory: Defines the strategy when message routing fails. true, call ReturnCallback; false: discard the message directly
  1. Define Return callback

Each RabbitTemplate can only be configured with one ReturnCallback, so it needs to be configured when the project is loaded:

Modify the publisher service and add one:

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {<!-- -->

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {<!-- -->
        //Get rabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //Set ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {<!-- -->
            log.error("Message sending failed, response code: {}, reason: {}, switch: {}, routing key: {}, message: {}",
                    replyCode, replyText, exchange, routingKey, message);
        });
    }
}
  1. DefineConfirmCallback

ConfirmCallback can be specified when sending a message, because the logic of each business processing confirm success or failure is not necessarily the same.

In the cn.itcast.mq.spring.SpringAmqpTest class of the publisher service, define a unit test method:

package cn.itcast.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {<!-- -->
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {<!-- -->
        //Message body
        String message = "hello, spring amqp!";
        //Set unique ID. Encapsulated into CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //Add callback
        correlationData.getFuture().addCallback(
                result -> {<!-- -->
                    if (result.isAck()){<!-- -->
                        //ack, sent successfully
                        log.debug("Message sent to switch successfully, ID: {}", correlationData.getId());
                    }else {<!-- -->
                        //nock, sending failed
                        log.error("Failed to send message to switch, ID:{}", correlationData.getId());
                    }
                },
                ex -> log.error("Message sending queue failed, ID: {}, reason: {}", correlationData.getId(), ex.getMessage())
        );
        //Send a message
        rabbitTemplate.convertAndSend("amq.direct", "amq", message, correlationData);
    }
}

test:
Add the exchange and queue corresponding to the code on the mq browser side, and set the contact routingKey.
exchange

queue

routingKey

result: