SpringBlade integrates RabbitMq

Integrating rabbitmq in the springblade framework

Create a new module (blade-rabbitmq) and add dependencies required by rabbitmq

  • blade-rabbitmq service pom file

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <description>springblade integrates rabbitmq</description>

Create configuration class RabbitMqConfig

  • configuration file

      #Configure rabbitMq server
        port: 5672
        username: guest/root
        password: guest/root
  • Declaration method (the project is created when it is started):

     * create queue 1
    // @Bean(RabbitMqConstant.QUEUE_NAME_ONE) //The name of the bean and the name of the queue are both my-queue-2 If not specified, the default is the method name
    // @Qualifier
    public Queue createQueueOne() {<!-- -->
        return new Queue(RabbitMqConstant. QUEUE_NAME_ONE);
     * create queue 2
    public Queue createQueueTwo() {<!-- -->
        //The name of the queue is my-queue-2
        return new Queue(RabbitMqConstant. QUEUE_NAME_TWO);
     * Create a direct switch
    public DirectExchange createDirectExchange() {<!-- -->
        return new DirectExchange(RabbitMqConstant. DIRECT_EXCHANGE_NAME);
     * Create topic exchange
    public TopicExchange createTopicExchange() {<!-- -->
        return new TopicExchange(RabbitMqConstant.TOPIC_EXCHANGE_NAME);
     * Create fan switch
    public FanoutExchange createFanoutExchange() {<!-- -->
        return new FanoutExchange(RabbitMqConstant. FANOUT_EXCHANGE_NAME);
     * Bind the queue directly to the switch and specify the routing key
    public Binding directExchangeBingdingQueue(Queue createQueueOne, DirectExchange exchange) {<!-- -->
        return BindingBuilder.bind(createQueueOne).to(exchange).with(RabbitMqConstant.DIRECT_ROUTING_KEY);
     * The topic switch binds the queue and specifies the routing key
     * When there are multiple queues and the attribute name does not specify which queue to bind you, you can use the @Qualifier annotation to limit which queue bean is injected
     * If the bean that creates the queue specifies the name of the bean, you need to specify a qualifier in the @Qualifier annotation, for example: @Qualifier(RabbitMqConstant.QUEUE_NAME_ONE)
    public Binding topicExchangeBingdingQueue(@Qualifier Queue queue, TopicExchange exchange) {<!-- -->
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstant.TOPIC_ROUTING_KEY);
     * Fan-type switch binding queue (no routing key required)
    public Binding fanoutExchangeBingdingQueue(@Qualifier Queue queue, FanoutExchange exchange) {<!-- -->
        return BindingBuilder.bind(queue).to(exchange);
  • Dynamic mode (do not create when the project starts, call the creation method and then create):

    private String host;
    private int port = 5672;
    private String userName;
    private String password;
     * Establish a rabbitmq connection
    public ConnectionFactory connectionFactory() {<!-- -->
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory. setHost(host);
        connectionFactory. setPort(port);
        connectionFactory. setUsername(userName);
        connectionFactory. setPassword(password);
        return connectionFactory;
     * Build the bean of rabbitAdmin and hand it over to the spring container for management
     * Build this to dynamically create exchanges and queues of the specified type (not declaratively)
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {<!-- -->
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //autoStartup must be set to true, otherwise the Spring container will not load the RabbitAdmin class
        rabbitAdmin.setAutoStartup(true);// Enable automatic startup when the service starts
        return rabbitAdmin;
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) //Set the scope of the bean, the scope refers to the life cycle of the instance, here is set to singleton mode
    public RabbitMqQueueTemplate rabbitMqQueueTemplate() {<!-- -->
        RabbitMqQueueTemplate rabbitMqQueueTemplate = new RabbitMqQueueTemplate();
        //Set mandatory flag when sending message
        // Set the message converter for this template. Used to parse object arguments to transform and send methods and object results from receive and transform methods.
        return rabbitMqQueueTemplate;
     * Message monitoring
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {<!-- -->
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        return messageListenerContainer;

Constant definition class RabbitMqConstant

//queue name 1
String QUEUE_NAME_ONE = "my-queue-1";
//queue name 2
String QUEUE_NAME_TWO = "my-queue-2";
//direct switch name
String DIRECT_EXCHANGE_NAME = "my-direct-exchange";
 * Topic switch name
String TOPIC_EXCHANGE_NAME = "my-topic-exchange";
// fan switch (broadcast) name
String FANOUT_EXCHANGE_NAME = "my-fanout-exchange";
//head switch
String HEADERS_EXCHANGE_NAME = "my-headers-exchange";
//routing key
String DIRECT_ROUTING_KEY = "my-direct-routing-key";
String TOPIC_ROUTING_KEY = "my-topic-routing-key";

Mq template extension class (custom method) RabbitMqQueueTemplate

private RabbitAdmin rabbitAdmin;
 * Create MQ queues and switches, and bind switches and queues
 * @param exchangeName exchange name
 * @param queueName queue name
 * @param routingKey Not needed when routing key change is FANOUT_EXCHANGE
 * @param exchangeType exchange type
public void createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, ExchangeType exchangeType) {<!-- -->
    this.createQueueAndBindToExchange(exchangeName, queueName, routingKey, true, false, false, null, exchangeType);
 * Create MQ queues and switches, and bind switches and queues
 * @param exchangeName exchange name
 * @param queueName queue name
 * @param routingKey routing key
 * @param durable persistence
 * @param exclusive exclusive
 * @param autoDelete automatically delete
 * @param arguments parameter
private void createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments, ExchangeType change) {<!-- -->
    Exchange exchange = null;
    switch (change) {<!-- -->
        case TOPIC_EXCHANGE:
            exchange = new TopicExchange(exchangeName, true, false);
        case DIRECT_EXCHANGE:
            exchange = new DirectExchange(exchangeName, true, false);
        case FANOUT_EXCHANGE:
            exchange = new FanoutExchange(exchangeName, true, false);
            exchange = new DirectExchange(exchangeName, true, false);
    Queue queue = new Queue(queueName, durable, exclusive, autoDelete, arguments);
    if (null != exchange) {<!-- -->
        //Create a switch, if there is no switch, if there is one, it will be skipped
        log.debug("Exchange: {} created!", JSONObject.toJSON(exchange));
        //Create queue
        log.debug("Queue: {} created!", JSONObject.toJSON(queue));
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
        //Create a binding relationship
        log.debug("Exchange: {} Queue: {} binding completed!", exchangeName, queue.getName());
    } else {<!-- -->
        log.debug("Queue created!");

Create a producer MessageProducer

private RabbitMqQueueTemplate rabbitMqQueueTemplate;
 * Monitor control
private SimpleMessageListenerContainer messageListenerContainer;
private MessageListener messageListener;
 * Send a message
 * @param message information content
 * @param routingKey routing key
 * @param exchangeType exchange type
public Object sendMessage(String message, String routingKey, ExchangeType exchangeType) {<!-- -->
    log.info("Specified exchange: {}, routing key: {}, sending information: {}", exchangeType, routingKey, message);
    Object result = null;
    switch (exchangeType) {<!-- -->
        case DIRECT_EXCHANGE:
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.DIRECT_EXCHANGE_NAME, routingKey, message);
        case TOPIC_EXCHANGE:
            //Two wildcards can be used in the topic switch routing key: * (match one word) and # (match multiple words)
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.TOPIC_EXCHANGE_NAME, routingKey, message);
        case FANOUT_EXCHANGE:
            //Fan switch does not need a routing key
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_EXCHANGE_NAME, "", message);
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.DIRECT_EXCHANGE_NAME, routingKey, message);
    return result;
 * Dynamically add a listening queue according to the queue name
 * @param queueName queue name
public void addListener(String queueName) {<!-- -->
 * Remove the specified queue from listening
 * @param queueName queue name
public void removeListener(String queueName) {<!-- -->
    messageListenerContainer. removeQueueNames(queueName);
  • Use the annotation @RabbitListener method

     * Use annotations to monitor the specified queue
     * It is necessary to specify the queue to be monitored before the project starts
     * @param message
    @RabbitListener(queues = RabbitMqConstant. QUEUE_NAME_ONE)
    public void receiveMessage(String message) {<!-- -->
        log.debug("Consumer got information: {}", message);
  • When adding a listening queue dynamically, you need to implement the onMessage method of the MessageListener interface.

        public void onMessage(Message message) {<!-- -->
            String mes = new String(message. getBody());
            log.debug("Monitored message: {}", mes);

Exchange type enumeration ExchangeType

 * Directly connected to the switch
 * Only when the routing key in the message exactly matches the bound key will the message be put into the response queue, Direct Ex
 * Topic switch
 * The routing key in the message is fuzzy matched with the bound Key, wildcard matching can be used. Binding Key can
 * Sector switch
 * No routing key (Routing Key), broadcast messages to all bound queues.
 * Head switch
 * Use multiple attributes to match message routing, which can be regarded as an upgraded version of Topic Exchange, with poor performance.
private String name;
ExchangeType(String name) {<!-- -->
this.name = name;

Test RabbitMqController

private MessageProducer producer;
private RabbitMqQueueTemplate rabbitMqQueueTemplate;
 * Create MQ queues and switches, and bind switches and queues
public R createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, ExchangeType exchangeType) {<!-- -->
    rabbitMqQueueTemplate.createQueueAndBindToExchange(exchangeName, queueName, routingKey, exchangeType);
    return R.success("Queue, switch created, and binding completed!");
public R sendMessage(String message, String routingKey, ExchangeType exchangeType) {<!-- -->
    log.debug("ExchangeType: {}", exchangeType);
    Object result = producer. sendMessage(message, routingKey, exchangeType);
    return R.data(result);
 * Dynamically specify the queue that needs to be monitored
 * @param queueName queue name
public R addListener(String queueName) {<!-- -->
    return R.success("queue monitoring success!");
public R removeListener(String queueName) {<!-- -->
    producer. removeListener(queueName);
    return R.success("Remove the listening queue!");