Integrating rabbitmq in the springblade framework
Create a new module (blade-rabbitmq) and add dependencies required by rabbitmq
-
blade-rabbitmq service pom file
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>blade-ops</artifactId> <groupId>org.springblade</groupId> <version>3.6.0</version> </parent> <artifactId>blade-ops-rabbitmq</artifactId> <name>${project.artifactId}</name> <version>${blade.tool.version}</version> <packaging>jar</packaging> <description>springblade integrates rabbitmq</description> <dependencies> <!--Blade--> <dependency> <groupId>org.springblade</groupId> <artifactId>blade-core-boot</artifactId> <version>${blade.tool.version}</version> </dependency> <!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> </project>
Create configuration class RabbitMqConfig
-
configuration file
spring: #Configure rabbitMq server rabbitmq: host: 127.0.0.1 port: 5672 username: guest/root password: guest/root
-
Declaration method (the project is created when it is started):
/** * create queue 1 */ @Bean // @Bean(RabbitMqConstant.QUEUE_NAME_ONE) //The name of the bean and the name of the queue are both my-queue-2 If not specified, the default is the method name // @Qualifier public Queue createQueueOne() {<!-- --> return new Queue(RabbitMqConstant. QUEUE_NAME_ONE); } /** * create queue 2 */ @Bean @Qualifier public Queue createQueueTwo() {<!-- --> //The name of the queue is my-queue-2 return new Queue(RabbitMqConstant. QUEUE_NAME_TWO); } /** * Create a direct switch */ @Bean public DirectExchange createDirectExchange() {<!-- --> return new DirectExchange(RabbitMqConstant. DIRECT_EXCHANGE_NAME); } /** * Create topic exchange */ @Bean public TopicExchange createTopicExchange() {<!-- --> return new TopicExchange(RabbitMqConstant.TOPIC_EXCHANGE_NAME); } /** * Create fan switch */ @Bean public FanoutExchange createFanoutExchange() {<!-- --> return new FanoutExchange(RabbitMqConstant. FANOUT_EXCHANGE_NAME); } /** * Bind the queue directly to the switch and specify the routing key */ @Bean public Binding directExchangeBingdingQueue(Queue createQueueOne, DirectExchange exchange) {<!-- --> return BindingBuilder.bind(createQueueOne).to(exchange).with(RabbitMqConstant.DIRECT_ROUTING_KEY); } /** * The topic switch binds the queue and specifies the routing key * When there are multiple queues and the attribute name does not specify which queue to bind you, you can use the @Qualifier annotation to limit which queue bean is injected * If the bean that creates the queue specifies the name of the bean, you need to specify a qualifier in the @Qualifier annotation, for example: @Qualifier(RabbitMqConstant.QUEUE_NAME_ONE) */ @Bean public Binding topicExchangeBingdingQueue(@Qualifier Queue queue, TopicExchange exchange) {<!-- --> return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstant.TOPIC_ROUTING_KEY); } /** * Fan-type switch binding queue (no routing key required) */ @Bean public Binding fanoutExchangeBingdingQueue(@Qualifier Queue queue, FanoutExchange exchange) {<!-- --> return BindingBuilder.bind(queue).to(exchange); }
-
Dynamic mode (do not create when the project starts, call the creation method and then create):
@Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port = 5672; @Value("${spring.rabbitmq.username}") private String userName; @Value("${spring.rabbitmq.password}") private String password; /** * Establish a rabbitmq connection */ @Bean public ConnectionFactory connectionFactory() {<!-- --> CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory. setHost(host); connectionFactory. setPort(port); connectionFactory. setUsername(userName); connectionFactory. setPassword(password); //connectionFactory.setVirtualHost("/"); return connectionFactory; } /** * Build the bean of rabbitAdmin and hand it over to the spring container for management * Build this to dynamically create exchanges and queues of the specified type (not declaratively) */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {<!-- --> RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); //autoStartup must be set to true, otherwise the Spring container will not load the RabbitAdmin class rabbitAdmin.setAutoStartup(true);// Enable automatic startup when the service starts return rabbitAdmin; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) //Set the scope of the bean, the scope refers to the life cycle of the instance, here is set to singleton mode public RabbitMqQueueTemplate rabbitMqQueueTemplate() {<!-- --> RabbitMqQueueTemplate rabbitMqQueueTemplate = new RabbitMqQueueTemplate(); rabbitMqQueueTemplate.setConnectionFactory(connectionFactory()); //Set mandatory flag when sending message rabbitMqQueueTemplate.setMandatory(true); // Set the message converter for this template. Used to parse object arguments to transform and send methods and object results from receive and transform methods. //rabbitMqQueueTemplate.setMessageConverter(); return rabbitMqQueueTemplate; } /** * Message monitoring */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {<!-- --> SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); return messageListenerContainer; }
Constant definition class RabbitMqConstant
//queue name 1 String QUEUE_NAME_ONE = "my-queue-1"; //queue name 2 String QUEUE_NAME_TWO = "my-queue-2"; //direct switch name String DIRECT_EXCHANGE_NAME = "my-direct-exchange"; /** * Topic switch name */ String TOPIC_EXCHANGE_NAME = "my-topic-exchange"; // fan switch (broadcast) name String FANOUT_EXCHANGE_NAME = "my-fanout-exchange"; //head switch String HEADERS_EXCHANGE_NAME = "my-headers-exchange"; //routing key String DIRECT_ROUTING_KEY = "my-direct-routing-key"; String TOPIC_ROUTING_KEY = "my-topic-routing-key";
Mq template extension class (custom method) RabbitMqQueueTemplate
@Resource private RabbitAdmin rabbitAdmin; /** * Create MQ queues and switches, and bind switches and queues * * @param exchangeName exchange name * @param queueName queue name * @param routingKey Not needed when routing key change is FANOUT_EXCHANGE * @param exchangeType exchange type */ public void createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, ExchangeType exchangeType) {<!-- --> this.createQueueAndBindToExchange(exchangeName, queueName, routingKey, true, false, false, null, exchangeType); } /** * Create MQ queues and switches, and bind switches and queues * * @param exchangeName exchange name * @param queueName queue name * @param routingKey routing key * @param durable persistence * @param exclusive exclusive * @param autoDelete automatically delete * @param arguments parameter */ private void createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments, ExchangeType change) {<!-- --> Exchange exchange = null; switch (change) {<!-- --> case TOPIC_EXCHANGE: exchange = new TopicExchange(exchangeName, true, false); break; case DIRECT_EXCHANGE: exchange = new DirectExchange(exchangeName, true, false); break; case FANOUT_EXCHANGE: exchange = new FanoutExchange(exchangeName, true, false); break; default: exchange = new DirectExchange(exchangeName, true, false); break; } Queue queue = new Queue(queueName, durable, exclusive, autoDelete, arguments); if (null != exchange) {<!-- --> //Create a switch, if there is no switch, if there is one, it will be skipped rabbitAdmin.declareExchange(exchange); log.debug("Exchange: {} created!", JSONObject.toJSON(exchange)); //Create queue rabbitAdmin.declareQueue(queue); log.debug("Queue: {} created!", JSONObject.toJSON(queue)); Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); //Create a binding relationship rabbitAdmin.declareBinding(binding); log.debug("Exchange: {} Queue: {} binding completed!", exchangeName, queue.getName()); } else {<!-- --> rabbitAdmin.declareQueue(queue); log.debug("Queue created!"); } }
Create a producer MessageProducer
@Autowired private RabbitMqQueueTemplate rabbitMqQueueTemplate; /** * Monitor control */ @Autowired private SimpleMessageListenerContainer messageListenerContainer; @Autowired @Qualifier private MessageListener messageListener; /** * Send a message * * @param message information content * @param routingKey routing key * @param exchangeType exchange type */ public Object sendMessage(String message, String routingKey, ExchangeType exchangeType) {<!-- --> log.info("Specified exchange: {}, routing key: {}, sending information: {}", exchangeType, routingKey, message); Object result = null; switch (exchangeType) {<!-- --> case DIRECT_EXCHANGE: result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.DIRECT_EXCHANGE_NAME, routingKey, message); break; case TOPIC_EXCHANGE: //Two wildcards can be used in the topic switch routing key: * (match one word) and # (match multiple words) result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.TOPIC_EXCHANGE_NAME, routingKey, message); break; case FANOUT_EXCHANGE: //Fan switch does not need a routing key result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_EXCHANGE_NAME, "", message); break; default: result = rabbitMqQueueTemplate.convertSendAndReceive(RabbitMqConstant.DIRECT_EXCHANGE_NAME, routingKey, message); } return result; } /** * Dynamically add a listening queue according to the queue name * @param queueName queue name */ public void addListener(String queueName) {<!-- --> messageListenerContainer.addQueueNames(queueName); messageListenerContainer.setMessageListener(messageListener); } /** * Remove the specified queue from listening * @param queueName queue name */ public void removeListener(String queueName) {<!-- --> messageListenerContainer. removeQueueNames(queueName); }
-
Use the annotation @RabbitListener method
/** * Use annotations to monitor the specified queue * It is necessary to specify the queue to be monitored before the project starts * * @param message */ @RabbitListener(queues = RabbitMqConstant. QUEUE_NAME_ONE) public void receiveMessage(String message) {<!-- --> log.debug("Consumer got information: {}", message); }
-
When adding a listening queue dynamically, you need to implement the onMessage method of the MessageListener interface.
@Override public void onMessage(Message message) {<!-- --> String mes = new String(message. getBody()); log.debug("Monitored message: {}", mes); }
Exchange type enumeration ExchangeType
/** * Directly connected to the switch * Only when the routing key in the message exactly matches the bound key will the message be put into the response queue, Direct Ex */ DIRECT_EXCHANGE("directExchange"), /** * Topic switch * The routing key in the message is fuzzy matched with the bound Key, wildcard matching can be used. Binding Key can */ TOPIC_EXCHANGE("topicExchange"), /** * Sector switch * No routing key (Routing Key), broadcast messages to all bound queues. */ FANOUT_EXCHANGE("fanoutExchange"), /** * Head switch * Use multiple attributes to match message routing, which can be regarded as an upgraded version of Topic Exchange, with poor performance. */ HEADERS_EXCHANGE("headersExchange"); private String name; ExchangeType(String name) {<!-- --> this.name = name; }
Test RabbitMqController
@Autowired private MessageProducer producer; @Autowired private RabbitMqQueueTemplate rabbitMqQueueTemplate; /** * Create MQ queues and switches, and bind switches and queues */ @PostMapping("/create") public R createQueueAndBindToExchange(String exchangeName, String queueName, String routingKey, ExchangeType exchangeType) {<!-- --> rabbitMqQueueTemplate.createQueueAndBindToExchange(exchangeName, queueName, routingKey, exchangeType); return R.success("Queue, switch created, and binding completed!"); } @PostMapping("/sendMessage") public R sendMessage(String message, String routingKey, ExchangeType exchangeType) {<!-- --> log.debug("ExchangeType: {}", exchangeType); Object result = producer. sendMessage(message, routingKey, exchangeType); return R.data(result); } /** * Dynamically specify the queue that needs to be monitored * @param queueName queue name */ @PostMapping("/addListener") public R addListener(String queueName) {<!-- --> producer.addListener(queueName); return R.success("queue monitoring success!"); } @DeleteMapping("/removeListener") public R removeListener(String queueName) {<!-- --> producer. removeListener(queueName); return R.success("Remove the listening queue!"); }