1. Reliability of messages
RabbitMQ provides a confirmation mechanism for Confirm.
The Confirm mechanism is used to confirm whether the message has been sent to the switch.
2.Java implementation
1. Import dependencies
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
2. Producer of Confirm mechanism
package com.qf.mq2302.hello; import com.qf.mq2302.utils.MQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { //Declare the queue name public static final String QUEUE_NAME="queueA"; public static void main(String[] args) throws Exception { //1. Get the connection object Connection conn = MQUtils.getConnection(); //2. Create a channel object. Most operations of MQ are defined on the channel object. Channel channel = conn.createChannel(); //3 Turn on confirm channel.confirmSelect(); //3. Declare a queue /** *queue – the name of the queue *durable - true means that the queue created is durable (the queue will still exist after mq is restarted) * exclusive – whether the queue is exclusive (whether the queue can only be used by the connection currently creating the queue) * autoDelete – whether the queue can be automatically deleted by the mq server * arguments - other parameters of the queue, can be null */ // channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello doubleasdasda!"; //How the producer sends messages, use the following method /** * exchange – the name of the switch. If it is an empty string, it means the message is sent to the default switch. * routingKey – the routing key. When sending a message to the default switch, the routingkey represents the name of the queue. * other properties - other properties of the message, can be null * body – the content of the message, note that if there is a byte array */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //Check whether the message was sent successfully try { /** * Determine whether it is sent to the switch. If it is sent, return true. * If the switch cannot be sent because the switch name is wrong, an exception will be thrown and the channel will be automatically closed. */ if (channel.waitForConfirms()) { //If true is returned, it means the switch successfully received the message System.out.println("The message has been successfully sent to the switch"); //Close the resource channel.close(); }else { System.out.println("The message failed to be sent to the switch"); //Close the resource channel.close(); } } catch (InterruptedException e) { System.out.println("The message failed to be sent to the switch"); System.out.println("The failure message is:" + message); } conn.close(); } }
3.Consumer of confirm mechanism
package com.qf.mq2302.hello; import com.qf.mq2302.utils.MQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import java.io.IOException; public class Recv { private final static String QUEUE_NAME="hello-queue"; public static void main(String[] args) throws Exception { //1. Get the connection object Connection conn = MQUtils.getConnection(); //2. Create a channel object. Most operations of MQ are defined on the channel object. Channel channel = conn.createChannel(); /** * The first parameter queue name * The second parameter, durability * The third parameter is exclusive * Whether the fourth parameter is automatically deleted * The fifth parameter can define what type of queue */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //3. The processing logic after the consumer receives the message is written in the DeliverCallback object. DeliverCallback deliverCallback =new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(consumerTag); //The byte array of the message sent by the producer can be obtained from the Delivery object byte[] body = message.getBody(); String msg = new String(body, "utf-8"); //Write the consumer's business logic here, for example, send an email System.out.println(msg); } }; //4. Let the current consumer start consuming messages in the (QUEUE_NAME) queue /** *queue – the name of the queue * autoAck – true indicates whether the current consumer is in automatic acknowledgment mode. true represents automatic confirmation. * deliverCallback – When a message is sent to the consumer, the logic of how the consumer handles the message * cancelCallback – when the consumer is canceled, if you want to execute code, write it here */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {}); } }
3. Integrate springboot implementation
1. Import dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.yml configuration file
spring: rabbitmq: host: 8.140.244.227 port: 6786 username:test password: test virtual-host: /test publisher-confirm-type: correlated #Enable the producer's confirm mechanism under the springboot project
3.RabbitMQ configuration file
package com.qf.bootmq2302.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); //Set the connection factory object rabbitTemplate.setConnectionFactory(cachingConnectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData:" + correlationData.getId()); System.out.println("correlationData:" + new String(correlationData.getReturnedMessage().getBody())); //You can get the value message from redis through the ID //Represents whether the message is successfully sent to the switch, false if the message fails, true if the message is successfully sent System.out.println("ack:" + ack); //Represents the reason for the error System.out.println("cause:" + cause); } }); return rabbitTemplate; } }
4. The producer writes a Controller
@Autowired RabbitTemplate rabbitTemplate; @GetMapping("/test1") public String test1(String msg,String routkey){ System.out.println(msg); String exchangeName = "";//Default switch String routingkey = routkey;//Queue name //Create a CorrelationData object CorrelationData correlationData = new CorrelationData(); correlationData.setId("001"); Message message = new Message(msg.getBytes(), null); correlationData.setReturnedMessage(message); //To store the message content and message number in redis, key=message number, value=message content //key = bootmq:failmessage:001 //Producer sends message //The fourth parameter can carry customized correlationData rabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData); return "ok"; }
5. The consumer writes a receiving queue message
@RabbitListener(queues = "queueA") public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException { System.out.println(data); //Manual ack//If manual ack is enabled and manual ack is not given, just follow prefetch: 1 #The amount equivalent to basicQos(1), that's it, no more will be given to you, because you have not confirmed it. Confirm one and I will give you one channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
6.Consumer configuration file
spring: rabbitmq: host: 8.140.244.227 port: 6786 username:test password: test virtual-host: /test #ManualACK listener: simple: acknowledge-mode: manual # Manual ack prefetch: 1 #Equivalent to basicQos(1)