Talking about the use of Spring combined with RabbitMQ

1. Persistence of messages

To ensure that messages are safely stored in RabbitMQ, the message persistence mechanism must be enabled, that is, switch persistence, queue persistence, and message persistence. By default, the switches, queues, and messages declared by SpringAmqp are all persistent, and we don’t need to specify it, that is, the Durability attribute is Durable.

1. RabbitMQ client persistence (three steps are indispensable)

1. Persistence of the exchange
// Parameter 1 exchange: exchange name
// Parameter 2 type: switch type
// Parameter 3 durable: Whether to persist
channel. exchangeDeclare(EXCHANGE_NAME, "topic", true);
2. Persistence of the queue
// Parameter 1 queue: queue name
// Parameter 2 durable: Whether to persist
// Parameter 3 exclusive: a private queue that can only be used by the creator, and will be automatically deleted after disconnection
// Parameter 4 autoDelete: Whether to automatically delete the queue when all consumer clients are disconnected
// parameter 5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
3. Persistence of messages
// Parameter 1 exchange: exchange
// Parameter 2 routingKey: routing key
// Parameter 3 props: other parameters of the message, where MessageProperties.PERSISTENT_TEXT_PLAIN means persistence
// Parameter 4 body: message body
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

2. Spring AMQP persistence (two steps are indispensable)

1. Exchange persistence
// Parameter 1 name: Interactor name
// Parameter 2 durable: Whether to persist
// Parameter 3 autoDelete: Whether to automatically delete the queue when all consumer client connections are disconnected
new TopicExchange(name, durable, autoDelete);
2. Queue persistence
// Parameter 1 name: queue name
// Parameter 2 durable: Whether to persist
// Parameter 3 exclusive: a private queue that can only be used by the creator, and will be automatically deleted after disconnection
// Parameter 4 autoDelete: Whether to automatically delete the queue when all consumer clients are disconnected
new Queue(name, durable, exclusive, autoDelete);

2. RabbitMq’s guaranteed message reliability mechanism

1. Producer confirmation mechanism
2. Failure retry mechanism
3. Consumer confirmation mechanism (automatic confirmation by default)
spring:
  rabbitmq:
    # ...previous configuration omitted..
    # Message acknowledgment (ACK)
    publisher-confirm-type: correlated #The confirmation message has been sent to the switch (Exchange)
    publisher-returns: true #Confirm that the message has been sent to the queue (Queue)
    listener:
      simple:
        acknowledge-mode: manual # manual confirmation
      direct:
        acknowledge-mode: manual # manual confirmation
    # Cluster configuration, use rabbitmq.addresses for cluster configuration, no need to configure rabbitmq.port rabbitmq.host
    addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673

1. Producer confirmation mechanism

Confirm whether the message is sent successfully through the callback function after sending.

ConfirmCallback(): The callback when the message arrives at the exchange.
ReturnCallback(): The callback after the message is sent from the exchange to the queue.

1. Global mode

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    // Set Mandatory to trigger the callback function, no matter what the message push result is, the callback function will be called forcibly
    rabbitTemplate. setMandatory(true);

    //Confirmation message is sent to the switch (Exchange) callback
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("\
Confirm message sent to the switch (Exchange) result:");
        System.out.println("correlation data: " + correlationData);
        System.out.println("Successful: " + ack);
        System.out.println("Error reason: " + cause);
    });

    //Confirm message sent to the queue (Queue) callback
    rabbitTemplate.setReturnsCallback(returnedMessage -> {
        System.out.println("\
Confirm message sent to queue (Queue) result:");
        System.out.println("Message occurred: " + returnedMessage.getMessage());
        System.out.println("Response code: " + returnedMessage.getReplyCode());
        System.out.println("Response message: " + returnedMessage.getReplyText());
        System.out.println("Exchange: " + returnedMessage.getExchange());
        System.out.println("Routing key: " + returnedMessage.getRoutingKey());
    });
    return rabbitTemplate;
}

2. Partial method

@Service
public class SendMessageService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    private static Logger logger = LoggerFactory. getLogger(SendMessageService. class);

    @Autowired
    public RabbitTemplate rabbitTemplate;

    public void sendMessage(String str){
        rabbitTemplate. setMandatory(true);
        rabbitTemplate. setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);
        // The id in the CorrelationData constructor can be written freely, but it must be non-null and unique
        rabbitTemplate.convertAndSend("exchange", "routingKey", str, new CorrelationData(UUID.randomUUID().toString()));
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("sender return success" + returnedMessage.toString());
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!");
        if (!b) {
            logger.error("Message sending exception!");
            // process
        } else {
            logger.info("The sender has received the confirmation, correlationData={}, ack={}, cause={}", correlationData.getId(), b, s);
        }
    }
}

2. Failure retry mechanism

# Enable retry, the default is false
spring.rabbitmq.listener.simple.retry.enabled=true
# The number of retries, the default is 3 times
spring.rabbitmq.listener.simple.retry.max-attempts=5
# Maximum interval between retries
spring.rabbitmq.listener.simple.retry.max-interval=10000

3. Consumer confirmation mechanism (automatic confirmation by default)

<strong>1. Message confirmation modes include:</strong>
AcknowledgeMode.NONE: automatic confirmation
AcknowledgeMode.AUTO: Confirm according to the situation
AcknowledgeMode.MANUAL: manual confirmation
<strong>2. Manual call by consumers</strong>
Basic.Ack: Used to acknowledge the current message
Basic.Nack: Used to negate the current message (batch rejection)
Basic.Reject: Used to reject the current message (single volume rejection)
<strong>3. Manual confirmation and rejection</strong>
# Acknowledge receipt of one or more messages. false, only confirm the current message; if true, confirm all messages less than or equal to deliveryTag.
channel.basicAck(deliveryTag,true);
# Reject one or more messages. multiple is false, rejecting the current message. multiple is true, rejecting all messages less than or equal to deliveryTag.
# requeue is true to re-add the message of consumption failure to the end of the queue, requeue is false and will not return to the queue.
channel.basicNack(deliveryTag, multiple, requeue) ;
# Reject a message: true: put it back into the queue, false: no longer re-enter the queue, and enter the dead letter queue if the dead letter queue is configured.
channel.basicReject(deliveryTag, true);
# Make it automatically resend messages
channel.basicAck(deliveryTag,false);

4. Code implementation

@Component
public class myConsumer {

    @RabbitListener(queues = "order. queue")
    public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            System.out.println("Message: " + orderMsg);
            System.out.println(1 / 0); // an exception occurred
            // manual confirmation
            channel. basicAck(tag, false);
        } catch (Exception e) {
            // If an exception occurs, resend according to the actual situation
            // After resending once, lost
            // Parameter 1: tag of the message
            // Parameter 2: Multiple processing
            // parameter 3: resend
                // false will not resend, will put the message into the dead letter queue
                // true retransmission, it is recommended not to use try/catch otherwise it will cause an infinite loop
            // manually reject the message
            channel. basicNack(tag, false, false);
        }
    }
}

2. Delay queue and dead letter queue

1. Delay of queue

Time To Live(TTL):
TTL refers to the survival time of the message. RabbitMQ can set the survival time of the message on the specified Queue through the x-message-ttl parameter, which is a non-negative integer in microseconds.
RabbitMQ can set the message expiration time from two dimensions, namely the queue and the message itself.
Set the queue expiration time, then all messages in the queue will have the same expiration time.
Set the message expiration time, and set the expiration time for a certain message in the queue. The TTL of each message can be different.
Note 1: If you set the TTL of the queue and the messages in the queue at the same time, the TTL value is based on the smaller value of the two.
Note 2: After the queue expires, all messages in the queue will become dead letters.
Note 3: After the message expires, only when the message is at the top of the queue can it be judged whether it has expired, and the expired message becomes a dead letter Dead Letter

2. Dead letter queue

Some messages in the Queue cannot be consumed and have no follow-up processing, so they become dead letters.
<strong>The source of dead letters</strong>
1. The survival time of the message in the queue exceeds the set TTL (Time To Live (survival time)) time.
2. The queue reaches the maximum length (the queue is full, no more data can be added to mq).
3. The message is rejected (basic.reject or basic.nack) with requeue=false (no requeue).
<strong>Configure dead letter queue</strong>
1. Configure the service queue and bind it to the service switch.
2. Configure the dead letter switch and routing key for the service queue.
3. Configure the dead letter queue for the dead letter switch.

3. Use cases

1. Configuration parameters

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
          default-requeue-rejected: false
          acknowledge-mode: manual

2. Configuration class

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    // business switch
    public static final String BUSINESS_EXCHANGE_NAME = "simple. business. exchange";
    // dead letter switch
    public static final String DEAD_LETTER_EXCHANGE = "simple. dead. exchange";
    // business queue
    public static final String BUSINESS_QUEUEA_NAME = "simple. business. queuea";
    // dead letter queue
    public static final String DEAD_LETTER_QUEUEA_NAME = "simple. dead. queuea";
    // dead letter queue routing Key
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "simple. dead. queuea. routingkey";

    /**
     * Declare business exchange (Exchange)
     */
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    /**
     * Declare dead letter Exchange (Exchange)
     */
    @Bean("deadExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * Declare business queue A
     */
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String, Object> args = new HashMap<>(3);
        // Delay Set how long the message is discarded after it is sent to the queue, unit: milliseconds
        args.put("x-message-ttl", 30*60*1000);
        // x-dead-exchange here declares the dead letter exchange bound to the current queue
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-routing-key declares the dead letter routing key of the current queue here
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }

    /**
     * Declare dead letter queue A
     */
    @Bean("deadQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    /**
     * Declare the business queue A binding relationship
     */
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    /**
     * Declare the binding relationship of dead letter queue A
     */
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

}

3. Sender

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class BusinessSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg);
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg,ms->{
            ms.getMessageProperties().setExpiration("100");
            return ms;
        });

    }
}

4. Normal receiver

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class BusinessReceiver {

    @RabbitListener(queues = RabbitMQConfig. BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message. getBody());
        log.info("Received business message A: {}", msg);
        boolean ack = true;
        Exception exception = null;
        try {
            if (msg. contains("deadletter")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }
        if (!ack){
            log.error("An exception occurred in message consumption, error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

}

5. Dead letter receiver

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class DeadReceiver {

    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("Received dead letter message A: " + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

3. Cluster mode

1. Normal mode (the default cluster mode).

Each node only puts part of the data, and each node synchronizes the metadata of the queue (storing the real instance location containing the queue data).
In fact, if you connect to other nodes during consumption, the data will be pulled from the node where the queue is located.

2. Mirror mode

Data is stored on multiple machines at the same time, achieving high availability.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Java skill treeHomepageOverview 118407 people are studying systematically