207. SpringBoot integrates RabbitMQ to implement message sending and receiving (listener)

Directory

  • Send message
  • Two ways to create a queue
  • code demo
    • Requirement 1: Send a message
      • 1. ContentUtil first defines constants
      • 2. One of two ways for RabbitMQConfig to create a queue:
        • Configuration:
          • question:
      • 3. MessageService writes logic
      • PublishController controller
      • application.properties configuration properties
      • Test: message sending
  • Receive messages
    • Code demo:
      • Test: Message reception
  • Customized listener container factory
  • Complete code:
    • application.properties RabbitMQ connection and other property configurations
    • ContentUtil constant tool class
    • RabbitMQConfig configuration creates message queue
    • MessageService business code for sending messages
    • PublishController.java control layer for sending messages
    • MyRabbitMQListener listener, listens to the message queue
    • pom.xml

Send message

- Spring Boot can inject AmqpAdmin and AmqpTemplate into any other component,
  Next, the component can manage Exchange, queues and bindings through AmqpAdmin, and can also send messages through AmqpTemplate.

- Spring Boot will also automatically configure a RabbitMessagingTemplate Bean (RabbitAutoConfiguration is responsible for the configuration),
  If you want to use it to send and receive messages,
  RabbitMessagingTemplate can be used instead of the AmqpTemplate above. The injection methods of the two Templates are exactly the same.

Two ways to create a queue

 Method 1 (programmatic): Create a queue through AmqpAdmin in the program.

 Method 2 (configuration): Configure beans of type org.springframework.amqp.core.Queue in the container.
                  RabbitMQ will automatically create a corresponding queue for the bean.

Code demo

Requirement 1: Send message

1. ContentUtil first defines constants

2. One of the two ways RabbitMQConfig creates a queue:

Configuration:

Configure a bean of type org.springframework.amqp.core.Queue in the container, and RabbitMQ will automatically create a corresponding queue for the bean.
Just create a @Bean in the configuration class that generates a message queue.

Question:

Declared as a configuration class with @Configuration annotation, but this queue is not automatically generated when the project is started.
It is understood that this is because RabbitMQ uses lazy loading. Probably because there is no consumer monitoring the queue, it is not created.
But when I wrote the following code, the message queue was generated, but there was no consumer to listen to the queue.
This is a bit confusing.
I don’t know where the code behind it is that allows this configuration class to successfully declare this message queue.

The truth comes to light:
after testing:
In the MessageService class below, the two objects AmqpAdmin and AmqpTemplate are dependency injected. When we use these two objects to declare the queue, Exchange and binding, the bean that creates the message queue in the configuration class can be successfully created. queue.
This picture can be seen by combining the code in the MessageService below:
This is declared in the parameterized constructors of the dependency injection AmqpAdmin and AmqpTemplate objects.

3. MessageService writing logic

Declare Exchange, Message Queue, Exchange and Message Queue bindings, methods of sending messages, etc.


PublishController controller

application.properties configuration properties


Test: message sending

Queue generated successfully

Send message test

Receive messages

The method decorated with the @RabbitListener annotation will be registered as a message listener method.

 [Remarks]: This annotation can specify the existing message queue it wants to monitor through the queues attribute.
  It can also use queuesToDeclare to declare a queue and listen to the queue.


 - If the listener container factory (RabbitListenerContainerFactory) is not explicitly configured,
 Spring Boot will automatically configure a SimpleRabbitListenerContainerFactory Bean in the container as the listener container factory.
 If you want to use DirectRabbitListenerContainerFactory, you can add the following configuration in the application.properties file:
  spring.rabbitmq.listener.type=direct

 ▲ If MessageRecoverer or MessageConverter is configured in the container,
   They are automatically associated with the default listener container factory.

Code demo:

Just create a listener for the message queue.

The method decorated with the @RabbitListener annotation will be registered as a message listener method.

This annotation can specify the existing message queue it wants to monitor through the queues attribute.
It can also use queuesToDeclare to declare a queue and listen to the queue,
You can also use bindings to perform binding operations between Exchange and queue.

Test: Message reception


Send messages and listen for messages

Custom listener container factory

▲ If you want to define more listener container factories or override the default listener container factory,

SimpleRabbitListenerContainerFactoryConfigurer available through Spring Boot
Or DirectRabbitListenerContainerFactoryConfigurer to implement,

They are available on SimpleRabbitListenerContainerFactory
or DirectRabbitListenerContainerFactory to make the same settings as auto-configuration.

▲ After you have a custom listener container factory, you can use the containerFactory attribute annotated with @RabbitListener
  To specify the use of a custom listener container factory,
For example, the following annotation code:

@RabbitListener(queues = "myQueue1", containerFactory="myFactory")

Full code:

application.properties RabbitMQ connection and other property configuration

# Configure basic information for connecting to RabbitMQ---------------------------------------- ---------------
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# The following properties can configure multiple connection addresses separated by commas. Once this property is configured, the host and port properties will be ignored.
# spring.rabbitmq.addresses=
spring.rabbitmq.username=ljh
spring.rabbitmq.password=123456
# Connect to virtual host
spring.rabbitmq.virtual-host=my-vhost01

# Configure RabbitMQ cache related information---------------------------------------------- -------------
#Specify whether to cache connection or cache channel
spring.rabbitmq.cache.connection.mode=channel
#Specify how many Channels can be cached
spring.rabbitmq.cache.channel.size=50
# If the selected cache mode is connection, then you can configure the following properties
# spring.rabbitmq.cache.connection.size=15

#Configure properties related to RabbitTemplate---------------------------------------------- -------
# Specify that RabbitTemplate will try again when sending a message fails.
spring.rabbitmq.template.retry.enabled=true
# RabbitTemplate retries sending the message every 1 second after failing to send the message.
spring.rabbitmq.template.retry.initial-interval=1s
# When RabbitTemplate fails to send a message, the maximum number of times it tries to resend the message
spring.rabbitmq.template.retry.max-attempts=5
# Set the time interval for each attempt to resend a message to a geometric sequence: 1s, 2s, 4s, 8s, 16s
# Wait for 1s for the first time and then try, wait for 2s for the second time and try again, wait 4s for the third time and try to resend the message...
spring.rabbitmq.template.retry.multiplier=2
#Specify the default Exchange name when sending messages
spring.rabbitmq.template.exchange=""
#Specify the default routing key when sending messages
spring.rabbitmq.template.routing-key="test"

# Configure properties related to the container factory of the message listener---------------------------------------------- ----------
#Specify the type of listener container factory
spring.rabbitmq.listener.type=simple
#Specify the confirmation mode of the message
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# Specify whether to retry when obtaining the message fails
spring.rabbitmq.listener.simple.retry.enabled=true
# When sending a message fails, the maximum number of times to try to resend the message
spring.rabbitmq.listener.simple.retry.max-attempts=2
# After failing to send the message, retry sending the message every 1 second.
spring.rabbitmq.listener.simple.retry.initial-interval=1s

ContentUtil constant tool class

package cn.ljh.app.rabbitmq.util;


//constant
public class ContentUtil
{<!-- -->
    //Define the constants of Exchange-----fanout: sector, which is the broadcast type
    public static final String EXCHANGE_NAME = "boot.fanout";

    //message queue array
    public static final String[] QUEUE_NAMES =new String[] {<!-- -->"queue_boot_01","queue_boot_02","queue_boot_03"};


}

RabbitMQConfig configuration creates message queue

package cn.ljh.app.rabbitmq.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


//Configuration: Configure a Bean of type org.springframework.amqp.core.Queue in the container, and RabbitMQ will automatically create a corresponding queue for the Bean.
//Declare this class as a configuration class
@Configuration
public classRabbitMQConfig
{<!-- -->

    //Define the queue in RabbitMQ using configuration mode
    @Bean
    public Queue myQueue()
    {<!-- -->
        //Configure a Queue Bean in the container, and Spring will automatically create the corresponding Queue in RabbitMQ for it
        return new Queue("queue_boot", /* Queue message queue name */
                true, /* Whether it is a persistent message queue */
                false, /* Whether it is an exclusive message queue. Exclusive means whether only the message consumer is allowed to consume messages from the queue */
                false, /* Whether to automatically delete the message queue when there is no message */
                null /* Additional message queue parameters */
        );
    }
}

MessageService business code for sending messages

Declare Exchange and Queue, Exchange binds Queue, and sends message code

package cn.ljh.app.rabbitmq.service;

import cn.ljh.app.rabbitmq.util.ContentUtil;
import org.springframework.amqp.core.*;
import org.springframework.stereotype.Service;


//Business logic: declare Exchange and Queue message queues, and methods of sending messages
@Service
public class MessageService
{<!-- -->
    //AmqpAdmin to manage Exchange, queues and bindings
    private final AmqpAdmin amqpAdmin;

    //AmqpTemplate to send messages
    private final AmqpTemplate amqpTemplate;

    //Dependency injection through parameterized constructor
    public MessageService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate)
    {<!-- -->
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;

        //Since the declaration of Exchange, queue, and binding (Exchange binding queue) only need to be done once, it can be completed in the constructor here.
        //Create an Exchange of fanout type and use the FanoutExchange implementation class
        FanoutExchange exchange = new FanoutExchange(
                ContentUtil.EXCHANGE_NAME,
                true, /* Whether Exchange is persistent */
                false, /* Whether to delete automatically */
                null /* Additional parameter attributes */
        );
        //Declare Exchange
        this.amqpAdmin.declareExchange(exchange);


        //The circular declaration of Queue here is also equivalent to creating a Queue in code.
        for (String queueName : ContentUtil.QUEUE_NAMES)
        {<!-- -->
            Queue queue = new Queue(queueName, /* Queue message queue name */
                    true, /* Whether it is a persistent message queue */
                    false, /* Whether it is an exclusive message queue. Exclusive means whether only the message consumer is allowed to consume messages from the queue */
                    false, /* Whether to automatically delete the message queue when there is no message */
                    null /* Additional message queue parameters */
            );
            //Declaring Queue here is also equivalent to creating a Queue [code style]
            this.amqpAdmin.declareQueue(queue);

            //Declare the binding of Queue
            Binding binding = new Binding(
                    queueName, /* Specify the name of the destination for the message to be distributed--this is the message queue to be sent to */
                    Binding.DestinationType.QUEUE, /* Type of distribution message destination, specify whether to bind queue or Exchange */
                    ContentUtil.EXCHANGE_NAME, /* Exchange to be bound */
                    "x", /* Because the bound Exchange type is fanout sector (broadcast) mode, so the routing key can be written casually, which has no effect */
                    null
                    );
            //Declare the binding of Queue
            amqpAdmin.declareBinding(binding);
        }
    }

    //Method to send message
    public void publish(String content)
    {<!-- -->
        //Send a message
        amqpTemplate.convertAndSend(
                ContentUtil.EXCHANGE_NAME, /* Specify to send the message to this Exchange */
                "", /* Because Exchange is a fanout type (broadcast type), it doesn't make sense to write any routing key */
                content /* message body sent */
        );
    }
}

PublishController.java control layer for sending messages

package cn.ljh.app.rabbitmq.controller;

import cn.ljh.app.rabbitmq.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

//Send a message
@RestController
public class PublishController
{<!-- -->
    private final MessageService messageService;
    //Constructor with parameters for dependency injection
    public PublishController(MessageService messageService)
    {<!-- -->
        this.messageService = messageService;
    }

    @GetMapping("/publish/{message}")
    //Because {message} is a path parameter, the annotation @PathVariable needs to be added when the method receives it
    public String publish(@PathVariable String message)
    {<!-- -->
        //make an announcement
        messageService.publish(message);
        return "Message published successfully";
    }

}

MyRabbitMQListener listener, listens to the message queue

package cn.ljh.app.rabbitmq.listener;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//Listener: listen to the message queue and consume it
@Component
public class MyRabbitMQListener
{<!-- -->
    //queues specifies which existing consumer queue to listen to
    @RabbitListener(queues = "queue_boot_01")
    public void onQ1Message(String message)
    {<!-- -->
        System.err.println("Message received from queue_boot_01 message queue: " + message);
    }

    //queues specifies which existing consumer queue to listen to
    @RabbitListener(queues = "queue_boot_02")
    public void onQ2Message(String message)
    {<!-- -->
        System.err.println("Message received from queue_boot_02 message queue: " + message);
    }

    //queues specifies which existing consumer queue to listen to
    //You can also use queuesToDeclare to directly declare and monitor the queue, and you can also use bindings to bind Exchange and queue.
    @RabbitListener(queuesToDeclare = @Queue(name = "queue_boot_03"
            ,durable = "true"
            ,exclusive = "false"
            ,autoDelete = "false"),

            admin = "amqpAdmin" /*Specify the declaration Queue and bind the amqpAdmin used by the Queue. If not specified, the default one in the container will be used */
    )
    public void onQ3Message(String message)
    {<!-- -->
        System.err.println("Message received from queue_boot_03 message queue: " + message);
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
    </parent>
    <groupId>cn.ljh</groupId>
    <artifactId>rabbitmq_boot</artifactId>
    <version>1.0.0</version>
    <name>rabbitmq_boot</name>
    <properties>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- RabbitMQ dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- web dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Dependencies on developer tools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <!-- lombok dependency-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>