Configuration:
Public files:
Create a unified queue configuration class QueueConstant.java
package test.common; public class QueueConstant {<!-- --> /** *Queue name */ public static final String RABBITMQ_TEST="rabbitmq.test"; }
Create the switch class ExchangeConstant.java
package test.common; /** *Switch unified configuration class * The switch name corresponds to the module name, and routing can trigger specific events for the module. Topic mode is used by default. */ public class ExchangeConstant {<!-- --> /** * Switch name-device */ public static final String EXCHANGE_TEST="test"; /** * Routing key-device-initialization */ public static final String EXCHANGE_KEY_TEST="test.key"; }
Create test class UserDTO .java
import java.io.Serializable; import lombok.Data; @Data public class UserDTO implements Serializable {<!-- --> private String username; private String password; }
Producer:
First create a RabbitMQConfig.java as the producer’s configuration class.
package test.producer; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Producer message confirmation--"Normal business is closed, important data is opened * Enable in configuration file: publisher-returns publisher-confirms * ? When the message is sent from producer to exchange, a confirmCallback will be returned. * ? If the message delivery from exchange–>queue fails, a returnCallback will be returned. */ @Configuration public class RabbitMQConfig {<!-- --> @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {<!-- --> RabbitTemplate template = new RabbitTemplate(connectionFactory); /** * Configure serialization tool */ Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setCreateMessageIds(true); template.setMessageConverter(jackson2JsonMessageConverter); /** * The message is sent from producer to exchange and a confirmCallback is immediately returned */ template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {<!-- --> @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- --> System.out.println("ConfirmCallback: " + "Related data: " + correlationData); System.out.println("ConfirmCallback: " + "Confirmation: " + ack); System.out.println("ConfirmCallback: " + "Cause: " + cause); } }); /** * Will be called only after entering the switch but not entering the queue - "routing failure" */ template.setReturnCallback(new RabbitTemplate.ReturnCallback() {<!-- --> @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- --> System.out.println("ReturnCallback: " + "Message: " + message); System.out.println("ReturnCallback: " + "Response code: " + replyCode); System.out.println("ReturnCallback: " + "Response information: " + replyText); System.out.println("ReturnCallback: " + "Exchange: " + exchange); System.out.println("ReturnCallback: " + "Routing Key: " + routingKey); } }); return template; } }
yml configuration file
spring: rabbitmq: host: #URL port: #port username: #account password: #password virtualHost: / #Switch ack confirmation # Correlated-asynchronous simple-synchronous none: Turn off the confirm confirmation mechanism Old version-》publisher-confirms: true publisher-confirm-type: none #Queue ack confirmation publisher-returns: false
Consumer:
Create queue configuration class QueueConfig.java
package test.consumer; import test.common.ExchangeConstant; import test.common.QueueConstant; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; /** * Queue configuration: queue, switch, binding initialization */ @Configuration public class QueueConfig {<!-- --> @Bean public TopicExchange DeviceExchange() {<!-- --> return new TopicExchange(ExchangeConstant.EXCHANGE_TEST); } @Bean public Binding rewardBindingDevice() {<!-- --> return BindingBuilder .bind(rewardQueue()) .to(DeviceExchange()) .with(ExchangeConstant.EXCHANGE_KEY_TEST); } }
Create mq configuration class RabbitMQConfig.java
package test.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; /** * mq configuration class: serialization and deserialization, message confirmation mechanism */ @Configuration public class RabbitMQConfig implements RabbitListenerConfigurer {<!-- --> /** * Configure monitoring and deserialization tools * @param rabbitListenerEndpointRegistrar */ @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {<!-- --> rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory(){<!-- --> DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(new MappingJackson2MessageConverter()); return messageHandlerMethodFactory; } /** * Configure sending serialization tool * @param connectionFactory * @return */ //Set up the serialization tool. By default, jdk's objectStream is used for serial numbers - large size @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {<!-- --> RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } } }
yml configuration
spring: rabbitmq: listener: simple: # Retry mechanism retry: enabled: true #Whether to enable consumer retry. It is enabled by default and retries infinitely. max-attempts: 3 #Maximum number of retries initial-interval: 3000ms #Retry interval (in milliseconds) max-interval: 600000ms #Maximum retry interval (in milliseconds) multiplier: 2 #Interval time multiplier, interval time * multiplier = next interval time, the maximum cannot exceed the set maximum interval time
Use
Producer
UserDTO userDTO = new UserDTO (); userDTO.setUsername("test"); rabbitTemplate.convertAndSend(ExchangeConstant.EXCHANGE_TEST,ExchangeConstant.EXCHANGE_KEY_TEST,userDTO ); try {<!-- --> Thread.sleep(2000); } catch (InterruptedException e) {<!-- --> throw new RuntimeException(e); }
Create a listening queue class QueueConsumer.java
package test.consumer; import test.common.userDTO; import test.common.ExchangeConstant; import test.common.QueueConstant; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * Listening queue */ @Slf4j @Component public class QueueConsumer {<!-- --> @RabbitListener(queues = QueueConstant.RABBITMQ_TEST) void listenOrder(Message message){<!-- --> log.info(message.toString()); try {<!-- --> switch (message.getMessageProperties().getReceivedRoutingKey()) {<!-- --> case ExchangeConstant.EXCHANGE_KEY_TEST: UserDTO userDTO = new ObjectMapper().readValue(message.getBody(),userDTO.class); log.info(userDTO.toString); break; default: break; } } catch (IOException e) {<!-- --> log.error("Deserialization failed",e); } } }