SpringBlade integrates RabbitMq

Integrating rabbitmq in the springblade framework

Create a new module (blade-rabbitmq) and add dependencies required by rabbitmq

  • blade-rabbitmq service pom file

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <artifactId>blade-ops</artifactId>
            <groupId>org.springblade</groupId>
            <version>3.6.0</version>
        </parent>
        
        <artifactId>blade-ops-rabbitmq</artifactId>
        <name>${project.artifactId}</name>
        <version>${blade.tool.version}</version>
        <packaging>jar</packaging>
        <description>springblade integrates rabbitmq</description>
    
        <dependencies>
            <!--Blade-->
            <dependency>
                <groupId>org.springblade</groupId>
                <artifactId>blade-core-boot</artifactId>
                <version>${blade.tool.version}</version>
            </dependency>
    
            <!--rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
    </project>
    
    

Create configuration class RabbitMqConfig

  • configuration file

    spring:
      #Configure rabbitMq server
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest/root
        password: guest/root
    
  • Declaration method (the project is created when it is started):

    /**
     * create queue 1
     */
    @Bean
    // @Bean(RabbitMqConstant.QUEUE_NAME_ONE) //The name of the bean and the name of the queue are both my-queue-2 If not specified, the default is the method name
    // @Qualifier
    public Queue createQueueOne() {<!-- -->
        return new Queue(RabbitMqConstant. QUEUE_NAME_ONE);
    }
    /**
     * create queue 2
     */
    @Bean
    @Qualifier
    public Queue createQueueTwo() {<!-- -->
        //The name of the queue is my-queue-2
        return new Queue(RabbitMqConstant. QUEUE_NAME_TWO);
    }
    /**
     * Create a direct switch
     */
    @Bean
    public DirectExchange createDirectExchange() {<!-- -->
        return new DirectExchange(RabbitMqConstant. DIRECT_EXCHANGE_NAME);
    }
    /**
     * Create topic exchange
     */
    @Bean
    public TopicExchange createTopicExchange() {<!-- -->
        return new TopicExchange(RabbitMqConstant.TOPIC_EXCHANGE_NAME);
    }
    /**
     * Create fan switch
     */
    @Bean
    public FanoutExchange createFanoutExchange() {<!-- -->
        return new FanoutExchange(RabbitMqConstant. FANOUT_EXCHANGE_NAME);
    }
    /**
     * Bind the queue directly to the switch and specify the routing key
     */
    @Bean
    public Binding directExchangeBingdingQueue(Queue createQueueOne, DirectExchange exchange) {<!-- -->
        return BindingBuilder.bind(createQueueOne).to(exchange).with(RabbitMqConstant.DIRECT_ROUTING_KEY);
    }
    /**
     * The topic switch binds the queue and specifies the routing key
     * When there are multiple queues and the attribute name does not specify which queue to bind you, you can use the @Qualifier annotation to limit which queue bean is injected
     * If the bean that creates the queue specifies the name of the bean, you need to specify a qualifier in the @Qualifier annotation, for example: @Qualifier(RabbitMqConstant.QUEUE_NAME_ONE)
     */
    @Bean
    public Binding topicExchangeBingdingQueue(@Qualifier Queue queue, TopicExchange exchange) {<!-- -->
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstant.TOPIC_ROUTING_KEY);
    }
    /**
     * Fan-type switch binding queue (no routing key required)
     */
    @Bean
    public Binding fanoutExchangeBingdingQueue(@Qualifier Queue queue, FanoutExchange exchange) {<!-- -->
        return BindingBuilder.bind(queue).to(exchange);
    }
    
  • Dynamic mode (do not create when the project starts, call the creation method and then create):

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port = 5672;
    @Value("${spring.rabbitmq.username}")
    private String userName;
    @Value("${spring.rabbitmq.password}")
    private String password;
    /**
     * Establish a rabbitmq connection
     */
    @Bean
    public ConnectionFactory connectionFactory() {<!-- -->
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory. setHost(host);
        connectionFactory. setPort(port);
        connectionFactory. setUsername(userName);
        connectionFactory. setPassword(password);
        //connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
    /**
     * Build the bean of rabbitAdmin and hand it over to the spring container for management
     * Build this to dynamically create exchanges and queues of the specified type (not declaratively)
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {<!-- -->
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //autoStartup must be set to true, otherwise the Spring container will not load the RabbitAdmin class
        rabbitAdmin.setAutoStartup(true);// Enable automatic startup when the service starts
        return rabbitAdmin;
    }
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) //Set the scope of the bean, the scope refers to the life cycle of the instance, here is set to singleton mode
    public RabbitMqQueueTemplate rabbitMqQueueTemplate() {<!-- -->
        RabbitMqQueueTemplate rabbitMqQueueTemplate = new RabbitMqQueueTemplate();
        rabbitMqQueueTemplate.setConnectionFactory(connectionFactory());
        //Set mandatory flag when sending message
        rabbitMqQueueTemplate.setMandatory(true);
        // Set the message converter for this template. Used to parse object arguments to transform and send methods and object results from receive and transform methods.
        //rabbitMqQueueTemplate.setMessageConverter();
        return rabbitMqQueueTemplate;
    }
    /**
     * Message monitoring
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {<!-- -->
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        return messageListenerContainer;
    }
    

Constant definition class RabbitMqConstant

//queue name 1
String QUEUE_NAME_ONE = "my-queue-1";
//queue name 2
String QUEUE_NAME_TWO = "my-queue-2";
//direct switch name
String DIRECT_EXCHANGE_NAME = "my-direct-exchange";
/**
 * Topic switch name
 */
String TOPIC_EXCHANGE_NAME = "my-topic-exchange";
// fan switch (broadcast) name
String FANOUT_EXCHANGE_NAME = "my-fanout-exchange";
//head switch
String HEADERS_EXCHANGE_NAME = "my-headers-exchange";
//routing key
String DIRECT_ROUTING_KEY = "my-direct-routing-key";
String TOPIC_ROUTING_KEY = "my-topic-routing-key";

Mq template extension class (custom method) RabbitMqQueueTemplate

@Resource
private RabbitAdmin rabbitAdmin;
/**
 * Create MQ queues and switches, and bind switches and queues
 *
 * @param exchangeName exchange name
 * @param queueName queue name
 * @param routingKey Not needed when routing key change is FANOUT_EXCHANGE
 * @param exchangeType exchange type
 */
public void createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, ExchangeType exchangeType) {<!-- -->
    this.createQueueAndBindToExchange(exchangeName, queueName, routingKey, true, false, false, null, exchangeType);
}
/**
 * Create MQ queues and switches, and bind switches and queues
 *
 * @param exchangeName exchange name
 * @param queueName queue name
 * @param routingKey routing key
 * @param durable persistence
 * @param exclusive exclusive
 * @param autoDelete automatically delete
 * @param arguments parameter
 */
private void createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments, ExchangeType change) {<!-- -->
    Exchange exchange = null;
    switch (change) {<!-- -->
        case TOPIC_EXCHANGE:
            exchange = new TopicExchange(exchangeName, true, false);
            break;
        case DIRECT_EXCHANGE:
            exchange = new DirectExchange(exchangeName, true, false);
            break;
        case FANOUT_EXCHANGE:
            exchange = new FanoutExchange(exchangeName, true, false);
            break;
        default:
            exchange = new DirectExchange(exchangeName, true, false);
            break;
    }
    Queue queue = new Queue(queueName, durable, exclusive, autoDelete, arguments);
    if (null != exchange) {<!-- -->
        //Create a switch, if there is no switch, if there is one, it will be skipped
        rabbitAdmin.declareExchange(exchange);
        log.debug("Exchange: {} created!", JSONObject.toJSON(exchange));
        //Create queue
        rabbitAdmin.declareQueue(queue);
        log.debug("Queue: {} created!", JSONObject.toJSON(queue));
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
        //Create a binding relationship
        rabbitAdmin.declareBinding(binding);
        log.debug("Exchange: {} Queue: {} binding completed!", exchangeName, queue.getName());
    } else {<!-- -->
        rabbitAdmin.declareQueue(queue);
        log.debug("Queue created!");
    }
}

Create a producer MessageProducer

@Autowired
private RabbitMqQueueTemplate rabbitMqQueueTemplate;
/**
 * Monitor control
 */
@Autowired
private SimpleMessageListenerContainer messageListenerContainer;
@Autowired
@Qualifier
private MessageListener messageListener;
/**
 * Send a message
 *
 * @param message information content
 * @param routingKey routing key
 * @param exchangeType exchange type
 */
public Object sendMessage(String message, String routingKey, ExchangeType exchangeType) {<!-- -->
    log.info("Specified exchange: {}, routing key: {}, sending information: {}", exchangeType, routingKey, message);
    Object result = null;
    switch (exchangeType) {<!-- -->
        case DIRECT_EXCHANGE:
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.DIRECT_EXCHANGE_NAME, routingKey, message);
            break;
        case TOPIC_EXCHANGE:
            //Two wildcards can be used in the topic switch routing key: * (match one word) and # (match multiple words)
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.TOPIC_EXCHANGE_NAME, routingKey, message);
            break;
        case FANOUT_EXCHANGE:
            //Fan switch does not need a routing key
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_EXCHANGE_NAME, "", message);
            break;
        default:
            result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.DIRECT_EXCHANGE_NAME, routingKey, message);
    }
    return result;
}
/**
 * Dynamically add a listening queue according to the queue name
 * @param queueName queue name
 */
public void addListener(String queueName) {<!-- -->
    messageListenerContainer.addQueueNames(queueName);
    messageListenerContainer.setMessageListener(messageListener);
}
/**
 * Remove the specified queue from listening
 * @param queueName queue name
 */
public void removeListener(String queueName) {<!-- -->
    messageListenerContainer. removeQueueNames(queueName);
}
  • Use the annotation @RabbitListener method

    /**
     * Use annotations to monitor the specified queue
     * It is necessary to specify the queue to be monitored before the project starts
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqConstant. QUEUE_NAME_ONE)
    public void receiveMessage(String message) {<!-- -->
        log.debug("Consumer got information: {}", message);
    }
    
  • When adding a listening queue dynamically, you need to implement the onMessage method of the MessageListener interface.

     @Override
        public void onMessage(Message message) {<!-- -->
            String mes = new String(message. getBody());
            log.debug("Monitored message: {}", mes);
        }
    

Exchange type enumeration ExchangeType

/**
 * Directly connected to the switch
 * Only when the routing key in the message exactly matches the bound key will the message be put into the response queue, Direct Ex
 */
DIRECT_EXCHANGE("directExchange"),
/**
 * Topic switch
 * The routing key in the message is fuzzy matched with the bound Key, wildcard matching can be used. Binding Key can
 */
TOPIC_EXCHANGE("topicExchange"),
/**
 * Sector switch
 * No routing key (Routing Key), broadcast messages to all bound queues.
 */
FANOUT_EXCHANGE("fanoutExchange"),
/**
 * Head switch
 * Use multiple attributes to match message routing, which can be regarded as an upgraded version of Topic Exchange, with poor performance.
 */
HEADERS_EXCHANGE("headersExchange");
private String name;
ExchangeType(String name) {<!-- -->
this.name = name;
}

Test RabbitMqController

@Autowired
private MessageProducer producer;
@Autowired
private RabbitMqQueueTemplate rabbitMqQueueTemplate;
/**
 * Create MQ queues and switches, and bind switches and queues
 */
@PostMapping("/create")
public R createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, ExchangeType exchangeType) {<!-- -->
    rabbitMqQueueTemplate.createQueueAndBindToExchange(exchangeName, queueName, routingKey, exchangeType);
    return R.success("Queue, switch created, and binding completed!");
}
@PostMapping("/sendMessage")
public R sendMessage(String message, String routingKey, ExchangeType exchangeType) {<!-- -->
    log.debug("ExchangeType: {}", exchangeType);
    Object result = producer. sendMessage(message, routingKey, exchangeType);
    return R.data(result);
}
/**
 * Dynamically specify the queue that needs to be monitored
 * @param queueName queue name
 */
@PostMapping("/addListener")
public R addListener(String queueName) {<!-- -->
    producer.addListener(queueName);
    return R.success("queue monitoring success!");
}
@DeleteMapping("/removeListener")
public R removeListener(String queueName) {<!-- -->
    producer. removeListener(queueName);
    return R.success("Remove the listening queue!");
}