First of all, I am a beginner in activemq. I also read a lot of blogs during my study. I am very grateful to the csdn blog for giving me inspiration. I hope my creation can be helpful to everyone.
1. Add the maven dependency package, not much to say, just go to the code
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.30</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
2. Some related configurations of activemq
server: port: 8083 #activcemq connection pool com: example: mqUrl: tcp://localhost:61616 #Whether it is placed in memory: in-memory: false pool: #Whether to enable connection pooling enable: true #Maximum number of connections maxConn: 200 user:admin password:admin #Message queue name queue: example: queue1: queue1 queue2: queue2 queue3: queue3 #Topic queue name topic: example: topic1: topic1 topic2: topic2 topic3: queue3
3. Create activemq connection through factory method
package com.example.activemq.factory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @className:@MqFactory * @Description:TODO * @Author: * @Version: 1.0 **/ @Configuration public class MqFactory { //tcp connection path @Value("${com.example.mqUrl}") private String brokerUrl; //Whether to put it in memory @Value("${com.example.in-memery}") private String inMemory; //Whether to enable connection pooling @Value("${com.example.pool.enable}") private Boolean enable; //Maximum number of connections @Value("${com.example.pool.maxConn}") private Integer maxConn; //username @Value("${com.example.user}") private String user; //password @Value("${com.example.password}") private String password; /** * Create factory */ @Bean public ActiveMQConnectionFactory connactivemq(){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); factory.setTransactedIndividualAck(true); factory.setMaxThreadPoolSize(maxConn); factory.setUserName(user); factory.setPassword(password); factory.setStatsEnabled(enable); return factory; } }
package com.example.activemq.factory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJcaListenerContainerFactory; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.stereotype.Component; /** * @className:@MqListener * @Description:TODO * @Author: Point-to-point listening bean injection * @Version: 1.0 **/ @Component public class MqListener { @Autowired private MqFactory mqFactory; /** * Bean that listens to the queue * @return */ @Bean public JmsListenerContainerFactory<?> jmsFactoryQueue(){ DefaultJmsListenerContainerFactory defaultFactory = new DefaultJmsListenerContainerFactory(); defaultFactory.setConnectionFactory(mqFactory.connactivemq()); return defaultFactory; } /** * Monitor topic * @return */ @Bean public JmsListenerContainerFactory<?> jmsFactoryTopic(){ DefaultJmsListenerContainerFactory defaultFactory = new DefaultJmsListenerContainerFactory(); defaultFactory.setConnectionFactory(mqFactory.connactivemq()); defaultFactory.setPubSubDomain(true); return defaultFactory; } }
4. Create multiple queue beans and topic beans respectively.
package com.example.activemq.factory; import com.example.activemq.model.QueueConfig; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; /** * @className:@MqQueueList * @Description:TODO Create a point-to-point queue * @Author: * @Version: 1.0 **/ @Configuration @EnableJms public class MqQueueList { @Autowired private QueueConfig queueConfig; @Bean public javax.jms.Queue queue1(){ return new ActiveMQQueue(queueConfig.getQueue1()); } @Bean public javax.jms.Queue queue2(){ return new ActiveMQQueue(queueConfig.getQueue2()); } @Bean public javax.jms.Queue queue3(){ return new ActiveMQQueue(queueConfig.getQueue3()); } }
package com.example.activemq.factory; import com.example.activemq.model.TopicConfig; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; /** * @className:@MqTopic * @Description:TODO * @Author: Publish and subscribe mode, publish topic * @Version: 1.0 **/ @Configuration @EnableJms public class MqTopic { @Autowired private TopicConfig topicConfig; @Bean public javax.jms.Topic topic1(){ return new ActiveMQTopic(topicConfig.getTopic1()); } @Bean public javax.jms.Topic topic2(){ return new ActiveMQTopic(topicConfig.getTopic2()); } @Bean public javax.jms.Topic topic3(){ return new ActiveMQTopic(topicConfig.getTopic3()); } }
5. The following are two model classes. Create a model class to obtain the value of the configuration file and reduce code redundancy.
package com.example.activemq.model; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; /** * @className:@QueueConfig * @Description:TODO * @Author: * @Version: 1.0 **/ @Data @Configuration public class QueueConfig { @Value("${queue.example.queue1}") private String queue1; @Value("${queue.example.queue2}") private String queue2; @Value("${queue.example.queue3}") private String queue3; }
package com.example.activemq.model; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; /** * @className:@QueueConfig * @Description:TODO * @Author: * @Version: 1.0 **/ @Data @Configuration public class TopicConfig { @Value("${topic.example.topic1}") private String topic1; @Value("${topic.example.topic2}") private String topic2; @Value("${topic.example.topic3}") private String topic3; }
6. The following is the message sending function of the test provider.
package com.example.activemq.producer; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.example.activemq.factory.MqTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.Queue; import javax.jms.Topic; import java.util.HashMap; import java.util.Map; /** * @className:@ProducerController * @Description:TODO * @Author: * @Version: 1.0 **/ @RestController public class ProducerController { @Autowired private Queue queue1; @Autowired private Queue queue2; @Autowired private Queue queue3; @Autowired private Topic topic1; @Autowired private Topic topic2; @Autowired private Topic topic3; @Autowired private JmsMessagingTemplate mesTemplate; @GetMapping("/sendQueue") public String getQueueContent(){ JSONObject sendData = new JSONObject(); sendData.put("msg","Sent successfully"); sendData.put("data","I am queue"); sendData.put("code",200); mesTemplate.convertAndSend(queue1,sendData.toJSONString()); mesTemplate.convertAndSend(queue2,sendData.toJSONString()); mesTemplate.convertAndSend(queue3,sendData.toJSONString()); return "ok,send Queue Success"; } @GetMapping("/sendTopic") public String getTopicContent(){ JSONObject sendData = new JSONObject(); sendData.put("msg","Sent successfully"); sendData.put("data","I am topic"); sendData.put("code",200); mesTemplate.convertAndSend(topic1, sendData.toJSONString()); mesTemplate.convertAndSend(topic2,sendData.toJSONString()); mesTemplate.convertAndSend(topic3,sendData.toJSONString()); return "ok,send Topic Success"; } }
7. Monitor and receive message classes
package com.example.activemq.consumer; import com.example.activemq.model.QueueConfig; import com.example.activemq.model.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * @className:@JMSListenerConcumer * @Description: Listen for incoming messages * @Author: * @Version: 1.0 **/ @Component @EnableJms public class JMSListenerConcumer { private Logger logger = LoggerFactory.getLogger(JMSListenerConcumer.class); @JmsListener(destination ="${queue.example.queue1}",containerFactory = "jmsFactoryQueue") public void recieveQueue1(String msg){ logger.info("Message received by queue1 {}",msg); } @JmsListener(destination ="${queue.example.queue2}",containerFactory = "jmsFactoryQueue") public void recieveQueue2(String msg){ logger.info("Message received by queue2 {}",msg); } @JmsListener(destination ="${queue.example.queue3}",containerFactory = "jmsFactoryQueue") public void recieveQueue3(String msg){ logger.info("Message received by queue3 {}",msg); } @JmsListener(destination ="${topic.example.topic1}",containerFactory = "jmsFactoryTopic") public void recieveTopic1(String msg){ logger.info("Message received by topic1 {}",msg); } @JmsListener(destination ="${topic.example.topic2}",containerFactory = "jmsFactoryTopic") public void recieveTopic2(String msg){ logger.info("Message received by topic2 {}",msg); } @JmsListener(destination ="${topic.example.topic3}",containerFactory = "jmsFactoryTopic") public void recieveTopic3(String msg){ logger.info("Message received by topic3 {}",msg); } }