Fallback message
When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message, such as
If the message is found to be unroutable, the message will be discarded directly. At this time, the producer does not know that the message is discarded. So how do I get messages that cannot be routed to help me find a way to deal with them? At least let me know, so I can handle it myself. By setting the mandatory parameter, the message can be returned to the producer when the destination is unreachable during message delivery.
Simply put, if the RoutingKey is wrong and the switch is normal, a message will be thrown directly.
1.yml configuration
spring: rabbitmq: host: 192.168.64.137 port: 5672 username: guest password: 123 virtual-host: / # publisher-confirm-type: correlated # Enable release confirmation type # ? NONE # Disable publish confirmation mode, is the default # ? CORRELATED # The callback method will be triggered after the message is successfully published to the exchange # ? SIMPLE (send one to confirm, not to use) # After testing, there are two effects, one of which will trigger the callback method as the CORRELATED value, # Second, use the rabbitTemplate to call the waitForConfirms or waitForConfirmsOrDie method after the message is successfully published # Wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. The points to note are # If the waitForConfirmsOrDie method returns false, the channel will be closed, and then messages cannot be sent to the broker publisher-confirms: true # Messages that cannot be routed will fall back publisher-returns: true
2. Configuration class
package com.cherry.rabbitmqttl.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Configuration class release confirmation (advanced) */ @Configuration public class ConfirmConfig {<!-- --> // switch public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; // queue public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; //routingKey public static final String CONFIRM_ROUTING_KEY = "key1"; // Declare the switch @Bean("confirmExchange") public DirectExchange confirmExchange(){<!-- --> return new DirectExchange(CONFIRM_EXCHANGE_NAME); } // queue @Bean("confirmQueue") public Queue confirmQueue(){<!-- --> return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue, @Qualifier("confirmExchange")DirectExchange confirmExchange){<!-- --> return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); }
3. Producer
package com.cherry.rabbitmqttl.controller; import com.cherry.rabbitmqttl.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * Test confirmation to start sending messages */ @Slf4j @RequestMapping("/confirm") @RestController public class ConfirmProductController {<!-- --> @Autowired private RabbitTemplate rabbitTemplate; /** * When the switch is wrong, the switch will display the error message that the switch has not received the message, * When the routingkey is wrong, it will display that the switch has received a message * Send message content, hello key1 * Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange1' in vhost '/', class-id=60 , method-id=40) * The exchange has not received the id: ID=1 message, due to the reason: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange1' in vhost '/', class-id=60, method-id=40) * Send message content, hello key12 * The switch has received the message with id: ID=2 * @param message */ // send message // ID unique identification // There is no way to get the failure callback when the routingkey is wrong @GetMapping("/sendMsg/{message}") public void sendMessage(@PathVariable String message){<!-- --> // send the message normally: both the exchange and the queue have received it CorrelationData correlationData3 = new CorrelationData("ID=0"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData3); log.info("Send message content, {}", message + "key0" ); log.info("____________________________________________________________________"); // The exchange sent the message incorrectly: the exchange did not receive it, the queue did not receive it CorrelationData correlationData1 = new CorrelationData("ID=1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME + 1, ConfirmConfig.CONFIRM_ROUTING_KEY + "key1", message + "key1", correlationData1); log.info("Send message content, {}", message + "key1"); log.info("____________________________________________________________________"); // Routingkey error sending message: the switch received it, but the queue did not receive it CorrelationData correlationData2 = new CorrelationData("ID=2"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY + "2", message + "key12", correlationData2); log.info("Send message content, {}", message + "key12"); } }
4. Consumer
package com.cherry.rabbitmqttl.consumer; import com.cherry.rabbitmqttl.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmConsumer {<!-- --> @RabbitListener(queues = ConfirmConfig. CONFIRM_QUEUE_NAME) public void receiveConfirm(Message message){<!-- --> String mesg = new String(message. getBody()); log.info("Received queue confirm.queue message: {}", mesg); } }
5. Rollback callback function
package com.cherry.rabbitmqttl.confirmcallback; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * Perceive the message callback that cannot be consumed normally */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {<!-- --> // inject @Autowired private RabbitTemplate rabbitTemplate; // @PostConstruct injection function is injected after Autowired injection @PostConstruct public void init(){<!-- --> rabbitTemplate.setConfirmCallback(this); rabbitTemplate. setReturnCallback(this); } /** * A callback method whether the switch receives a message or not * CorrelationData * Message related data * ack * Whether the switch received the message */ @Override public void confirm(CorrelationData correlationData, boolean ack, java.lang.String cause) {<!-- --> String id=correlationData!=null?correlationData.getId():""; if(ack){<!-- --> log.info("The switch has received the message with id: {}", id); }else{<!-- --> log.info("The exchange has not received the message with id: {}, due to the reason: {}", id, cause); } } // You can return the message to the producer when the message is not routable during message delivery (routingKey error) // Roll back only when the goal is unreachable @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- --> log.error("message {}, returned by switch {}, reason for return: {}, routing Key{}", new String(message. getBody()), exchange, replyText, routingKey); } }
6.pom dependency
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.cherry</groupId> <artifactId>rabbitmqttl</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmqttl</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!--RabbitMQ dependency--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!--RabbitMQ test dependencies --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>