Directory
- Send message
- Two ways to create a queue
- code demo
-
- Requirement 1: Send a message
-
- 1. ContentUtil first defines constants
- 2. One of two ways for RabbitMQConfig to create a queue:
-
- Configuration:
-
- question:
- 3. MessageService writes logic
- PublishController controller
- application.properties configuration properties
- Test: message sending
- Receive messages
-
- Code demo:
-
- Test: Message reception
- Customized listener container factory
- Complete code:
-
- application.properties RabbitMQ connection and other property configurations
- ContentUtil constant tool class
- RabbitMQConfig configuration creates message queue
- MessageService business code for sending messages
- PublishController.java control layer for sending messages
- MyRabbitMQListener listener, listens to the message queue
- pom.xml
Send message
- Spring Boot can inject AmqpAdmin and AmqpTemplate into any other component, Next, the component can manage Exchange, queues and bindings through AmqpAdmin, and can also send messages through AmqpTemplate. - Spring Boot will also automatically configure a RabbitMessagingTemplate Bean (RabbitAutoConfiguration is responsible for the configuration), If you want to use it to send and receive messages, RabbitMessagingTemplate can be used instead of the AmqpTemplate above. The injection methods of the two Templates are exactly the same.
Two ways to create a queue
Method 1 (programmatic): Create a queue through AmqpAdmin in the program. Method 2 (configuration): Configure beans of type org.springframework.amqp.core.Queue in the container. RabbitMQ will automatically create a corresponding queue for the bean.
Code demo
Requirement 1: Send message
1. ContentUtil first defines constants
2. One of the two ways RabbitMQConfig creates a queue:
Configuration:
Configure a bean of type org.springframework.amqp.core.Queue in the container, and RabbitMQ will automatically create a corresponding queue for the bean.
Just create a @Bean in the configuration class that generates a message queue.
Question:
Declared as a configuration class with @Configuration annotation, but this queue is not automatically generated when the project is started.
It is understood that this is because RabbitMQ uses lazy loading. Probably because there is no consumer monitoring the queue, it is not created.
But when I wrote the following code, the message queue was generated, but there was no consumer to listen to the queue.
This is a bit confusing.
I don’t know where the code behind it is that allows this configuration class to successfully declare this message queue.
The truth comes to light:
after testing:
In the MessageService class below, the two objects AmqpAdmin and AmqpTemplate are dependency injected. When we use these two objects to declare the queue, Exchange and binding, the bean that creates the message queue in the configuration class can be successfully created. queue.
This picture can be seen by combining the code in the MessageService below:
This is declared in the parameterized constructors of the dependency injection AmqpAdmin and AmqpTemplate objects.
3. MessageService writing logic
Declare Exchange, Message Queue, Exchange and Message Queue bindings, methods of sending messages, etc.
PublishController controller
application.properties configuration properties
Test: message sending
Queue generated successfully
Send message test
Receive messages
The method decorated with the @RabbitListener annotation will be registered as a message listener method.
[Remarks]: This annotation can specify the existing message queue it wants to monitor through the queues attribute. It can also use queuesToDeclare to declare a queue and listen to the queue. - If the listener container factory (RabbitListenerContainerFactory) is not explicitly configured, Spring Boot will automatically configure a SimpleRabbitListenerContainerFactory Bean in the container as the listener container factory. If you want to use DirectRabbitListenerContainerFactory, you can add the following configuration in the application.properties file: spring.rabbitmq.listener.type=direct ▲ If MessageRecoverer or MessageConverter is configured in the container, They are automatically associated with the default listener container factory.
Code demo:
Just create a listener for the message queue.
The method decorated with the @RabbitListener annotation will be registered as a message listener method.
This annotation can specify the existing message queue it wants to monitor through the queues attribute.
It can also use queuesToDeclare to declare a queue and listen to the queue,
You can also use bindings to perform binding operations between Exchange and queue.
Test: Message reception
Send messages and listen for messages
Custom listener container factory
▲ If you want to define more listener container factories or override the default listener container factory, SimpleRabbitListenerContainerFactoryConfigurer available through Spring Boot Or DirectRabbitListenerContainerFactoryConfigurer to implement, They are available on SimpleRabbitListenerContainerFactory or DirectRabbitListenerContainerFactory to make the same settings as auto-configuration. ▲ After you have a custom listener container factory, you can use the containerFactory attribute annotated with @RabbitListener To specify the use of a custom listener container factory, For example, the following annotation code: @RabbitListener(queues = "myQueue1", containerFactory="myFactory")
Full code:
application.properties RabbitMQ connection and other property configuration
# Configure basic information for connecting to RabbitMQ---------------------------------------- --------------- spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 # The following properties can configure multiple connection addresses separated by commas. Once this property is configured, the host and port properties will be ignored. # spring.rabbitmq.addresses= spring.rabbitmq.username=ljh spring.rabbitmq.password=123456 # Connect to virtual host spring.rabbitmq.virtual-host=my-vhost01 # Configure RabbitMQ cache related information---------------------------------------------- ------------- #Specify whether to cache connection or cache channel spring.rabbitmq.cache.connection.mode=channel #Specify how many Channels can be cached spring.rabbitmq.cache.channel.size=50 # If the selected cache mode is connection, then you can configure the following properties # spring.rabbitmq.cache.connection.size=15 #Configure properties related to RabbitTemplate---------------------------------------------- ------- # Specify that RabbitTemplate will try again when sending a message fails. spring.rabbitmq.template.retry.enabled=true # RabbitTemplate retries sending the message every 1 second after failing to send the message. spring.rabbitmq.template.retry.initial-interval=1s # When RabbitTemplate fails to send a message, the maximum number of times it tries to resend the message spring.rabbitmq.template.retry.max-attempts=5 # Set the time interval for each attempt to resend a message to a geometric sequence: 1s, 2s, 4s, 8s, 16s # Wait for 1s for the first time and then try, wait for 2s for the second time and try again, wait 4s for the third time and try to resend the message... spring.rabbitmq.template.retry.multiplier=2 #Specify the default Exchange name when sending messages spring.rabbitmq.template.exchange="" #Specify the default routing key when sending messages spring.rabbitmq.template.routing-key="test" # Configure properties related to the container factory of the message listener---------------------------------------------- ---------- #Specify the type of listener container factory spring.rabbitmq.listener.type=simple #Specify the confirmation mode of the message spring.rabbitmq.listener.simple.acknowledge-mode=auto # Specify whether to retry when obtaining the message fails spring.rabbitmq.listener.simple.retry.enabled=true # When sending a message fails, the maximum number of times to try to resend the message spring.rabbitmq.listener.simple.retry.max-attempts=2 # After failing to send the message, retry sending the message every 1 second. spring.rabbitmq.listener.simple.retry.initial-interval=1s
ContentUtil constant tool class
package cn.ljh.app.rabbitmq.util; //constant public class ContentUtil {<!-- --> //Define the constants of Exchange-----fanout: sector, which is the broadcast type public static final String EXCHANGE_NAME = "boot.fanout"; //message queue array public static final String[] QUEUE_NAMES =new String[] {<!-- -->"queue_boot_01","queue_boot_02","queue_boot_03"}; }
RabbitMQConfig configuration creates message queue
package cn.ljh.app.rabbitmq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //Configuration: Configure a Bean of type org.springframework.amqp.core.Queue in the container, and RabbitMQ will automatically create a corresponding queue for the Bean. //Declare this class as a configuration class @Configuration public classRabbitMQConfig {<!-- --> //Define the queue in RabbitMQ using configuration mode @Bean public Queue myQueue() {<!-- --> //Configure a Queue Bean in the container, and Spring will automatically create the corresponding Queue in RabbitMQ for it return new Queue("queue_boot", /* Queue message queue name */ true, /* Whether it is a persistent message queue */ false, /* Whether it is an exclusive message queue. Exclusive means whether only the message consumer is allowed to consume messages from the queue */ false, /* Whether to automatically delete the message queue when there is no message */ null /* Additional message queue parameters */ ); } }
MessageService business code for sending messages
Declare Exchange and Queue, Exchange binds Queue, and sends message code
package cn.ljh.app.rabbitmq.service; import cn.ljh.app.rabbitmq.util.ContentUtil; import org.springframework.amqp.core.*; import org.springframework.stereotype.Service; //Business logic: declare Exchange and Queue message queues, and methods of sending messages @Service public class MessageService {<!-- --> //AmqpAdmin to manage Exchange, queues and bindings private final AmqpAdmin amqpAdmin; //AmqpTemplate to send messages private final AmqpTemplate amqpTemplate; //Dependency injection through parameterized constructor public MessageService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {<!-- --> this.amqpAdmin = amqpAdmin; this.amqpTemplate = amqpTemplate; //Since the declaration of Exchange, queue, and binding (Exchange binding queue) only need to be done once, it can be completed in the constructor here. //Create an Exchange of fanout type and use the FanoutExchange implementation class FanoutExchange exchange = new FanoutExchange( ContentUtil.EXCHANGE_NAME, true, /* Whether Exchange is persistent */ false, /* Whether to delete automatically */ null /* Additional parameter attributes */ ); //Declare Exchange this.amqpAdmin.declareExchange(exchange); //The circular declaration of Queue here is also equivalent to creating a Queue in code. for (String queueName : ContentUtil.QUEUE_NAMES) {<!-- --> Queue queue = new Queue(queueName, /* Queue message queue name */ true, /* Whether it is a persistent message queue */ false, /* Whether it is an exclusive message queue. Exclusive means whether only the message consumer is allowed to consume messages from the queue */ false, /* Whether to automatically delete the message queue when there is no message */ null /* Additional message queue parameters */ ); //Declaring Queue here is also equivalent to creating a Queue [code style] this.amqpAdmin.declareQueue(queue); //Declare the binding of Queue Binding binding = new Binding( queueName, /* Specify the name of the destination for the message to be distributed--this is the message queue to be sent to */ Binding.DestinationType.QUEUE, /* Type of distribution message destination, specify whether to bind queue or Exchange */ ContentUtil.EXCHANGE_NAME, /* Exchange to be bound */ "x", /* Because the bound Exchange type is fanout sector (broadcast) mode, so the routing key can be written casually, which has no effect */ null ); //Declare the binding of Queue amqpAdmin.declareBinding(binding); } } //Method to send message public void publish(String content) {<!-- --> //Send a message amqpTemplate.convertAndSend( ContentUtil.EXCHANGE_NAME, /* Specify to send the message to this Exchange */ "", /* Because Exchange is a fanout type (broadcast type), it doesn't make sense to write any routing key */ content /* message body sent */ ); } }
PublishController.java control layer for sending messages
package cn.ljh.app.rabbitmq.controller; import cn.ljh.app.rabbitmq.service.MessageService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; //Send a message @RestController public class PublishController {<!-- --> private final MessageService messageService; //Constructor with parameters for dependency injection public PublishController(MessageService messageService) {<!-- --> this.messageService = messageService; } @GetMapping("/publish/{message}") //Because {message} is a path parameter, the annotation @PathVariable needs to be added when the method receives it public String publish(@PathVariable String message) {<!-- --> //make an announcement messageService.publish(message); return "Message published successfully"; } }
MyRabbitMQListener listener, listens to the message queue
package cn.ljh.app.rabbitmq.listener; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; //Listener: listen to the message queue and consume it @Component public class MyRabbitMQListener {<!-- --> //queues specifies which existing consumer queue to listen to @RabbitListener(queues = "queue_boot_01") public void onQ1Message(String message) {<!-- --> System.err.println("Message received from queue_boot_01 message queue: " + message); } //queues specifies which existing consumer queue to listen to @RabbitListener(queues = "queue_boot_02") public void onQ2Message(String message) {<!-- --> System.err.println("Message received from queue_boot_02 message queue: " + message); } //queues specifies which existing consumer queue to listen to //You can also use queuesToDeclare to directly declare and monitor the queue, and you can also use bindings to bind Exchange and queue. @RabbitListener(queuesToDeclare = @Queue(name = "queue_boot_03" ,durable = "true" ,exclusive = "false" ,autoDelete = "false"), admin = "amqpAdmin" /*Specify the declaration Queue and bind the amqpAdmin used by the Queue. If not specified, the default one in the container will be used */ ) public void onQ3Message(String message) {<!-- --> System.err.println("Message received from queue_boot_03 message queue: " + message); } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.5</version> </parent> <groupId>cn.ljh</groupId> <artifactId>rabbitmq_boot</artifactId> <version>1.0.0</version> <name>rabbitmq_boot</name> <properties> <java.version>11</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- RabbitMQ dependencies --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- web dependencies --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Dependencies on developer tools --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <!-- lombok dependency--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>