1. Foreword
In the previous section, we had a basic understanding of RocketMQ
through the following chapters:
docker-compose builds RocketMQ 5.1.0 cluster (dual master and dual slave mode) | Spring Cloud 28
docker-compose build RocketMQ 5.1.0 cluster to enable ACL permission control | Spring Cloud 29
Now let’s formally study the integration of RocketMQ
in Spring Boot
. In this chapter, we will mainly explain the following parts:
- Sending and receiving of ordinary messages
- Delayed sending and receiving of messages
- Sending and receiving transactional messages
- The sender and receiver enable
ACL
PULL
mode consumption and@ExtRocketMQConsumerConfiguration
use
2. Project integration RocketMQ
2.1 Overall project structure
2.2 Import dependencies
rocketmq/pom.xml
:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
2.3 Configuration file
rocketmq/src/main/resources/application.yml
:
server: port: 8888 spring: application: name: @artifactId@ rocketmq: name-server: 192.168.0.30:9876 producer: group: @artifactId@-group send-message-timeout: 60000 # Send message timeout, unit: milliseconds. Default is 3000 retry-times-when-send-failed: 3 # When sending messages synchronously, the number of failed retries. Default is 2 times retry-times-when-send-async-failed: 2 # When sending messages asynchronously, the number of failed retries. Default is 2 times retry-next-server: false # When sending a message to the Broker, if the sending fails, whether to retry another Broker. The default is false access-key: RocketMQAdmin # Access Key secret-key: 1qaz@WSX # Secret Key enable-msg-trace: true # Whether to enable the message trace function, the default is true to enable customized-trace-topic: RMQ_SYS_TRACE_TOPIC # Customize the topic of the message track, the default is RMQ_SYS_TRACE_TOPIC consumer: access-key: RocketMQAdmin # Access Key secret-key: 1qaz@WSX # Secret Key record: level: org: springframework: boot: autoconfigure: logging: info
2.4 Theme and consumer group constants
com/gm/rocketmq/component/rocketmq/TopicConstants.java
:
package com.gm.rocketmq.component.rocketmq; /** * theme constant */ public interface TopicConstants {<!-- --> String NORMAL_ROCKETMQ_TOPIC_TEST= "NORMAL_ROCKETMQ_TOPIC_TEST"; String ORDERLY_ROCKETMQ_TOPIC_TEST= "ORDERLY_ROCKETMQ_TOPIC_TEST"; String SCHEDULE_ROCKETMQ_TOPIC_TEST= "SCHEDULE_ROCKETMQ_TOPIC_TEST"; String TRANSACTION_ROCKETMQ_TOPIC_TEST= "TRANSACTION_ROCKETMQ_TOPIC_TEST"; String PULL_ROCKETMQ_TOPIC_TEST= "PULL_ROCKETMQ_TOPIC_TEST"; String EXT_ROCKETMQ_TOPIC_TEST= "EXT_ROCKETMQ_TOPIC_TEST"; String CONSUMER_GROUP = "_CONSUMER_GROUP"; }
3. Various types of message sending and receiving
3.1 General message
3.1.1 Common message sending
@Autowired private RocketMQTemplate rocketMQTemplate; /** * Send synchronous and asynchronous messages to rocketmq */ @RequestMapping(value = "sendNormal", method = RequestMethod. GET) public String sendNormal() {<!-- --> rocketMQTemplate.send(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":sync", MessageBuilder.withPayload("Send messages synchronously").build()); rocketMQTemplate.asyncSend(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":async", MessageBuilder.withPayload("Send message asynchronously").build(), new SendCallback() {<!-- --> @Override public void onSuccess(SendResult sendResult) {<!-- --> log.info("Asynchronous send successfully: {}", sendResult.getSendStatus().name()); } @Override public void onException(Throwable throwable) {<!-- --> log.info("Asynchronous sending failed: {}", throwable.getMessage()); } }); return "OK"; }
3.1.2 Common message reception
com/gm/rocketmq/component/rocketmq/NormalRocketMqListener.java
:
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-cret-key}", secret$conrock-key = "m ") public class NormalRocketMqListener implements RocketMQListener<String> {<!-- --> @Override public void onMessage(String s) {<!-- --> log.info("Ordinary subscription - information received: {}", s); } }
@RocketMQMessageListener
annotation parameter description:
consumerGroup
: consumer subscription group, it is required and must be unique.topic
: topic name, the topic name sent by production.consumeMode
: consumption mode, you can choose to receive messages concurrently or sequentially; by default,CONCURRENTLY
receives messages delivered asynchronously at the same time.messageModel
: message mode, the defaultCLUSTERING
cluster consumption; if you want all subscribers to receive messages, you can set broadcastBROADCASTING
.consumeThreadMax
: The maximum number of consumer threads, the default is64
.consumeTimeout
: The maximum time for message blocking, the default is15
minutes.nameServer
: server address, the address of the configuration file is read by default, and the specified location can be set separately for consumers.selectorExpression
: Consume the business message of the specified Tag.- The
ACL
function on theConsumer
side needs to be configured in@RocketMQMessageListener
- The
Producer
sideACL
function needs to be configured in the configuration file - See more official explanations
3.2 Sequential messages
3.2.1 Sequential message sending
/** * Send sequential messages to rockertmq, synchronously * * @return */ @RequestMapping(value = "sendOrderlySync", method = RequestMethod.GET) public String sendOrderlySync() {<!-- --> // Order List List<OrderStep> orderList = buildOrders(); for (int i = 0; i < 10; i ++ ) {<!-- --> Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build(); String orderId = String. valueOf(orderList. get(i). getOrderId()); rocketMQTemplate.sendOneWayOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":sync", msg, orderId); } return "OK"; } /** * rockertmq sends sequential messages, asynchronously * * @return */ @RequestMapping(value = "sendOrderlyAsync", method = RequestMethod.GET) public String sendOrderlyAsync() {<!-- --> // Order List List<OrderStep> orderList = buildOrders(); for (int i = 0; i < 10; i ++ ) {<!-- --> Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build(); String orderId = String. valueOf(orderList. get(i). getOrderId()); rocketMQTemplate.syncSendOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":async", msg, orderId); } return "OK"; } /** * Order steps */ private static class OrderStep {<!-- --> private long orderId; private String desc; public long getOrderId() {<!-- --> return orderId; } public void setOrderId(long orderId) {<!-- --> this. orderId = orderId; } public String getDesc() {<!-- --> return desc; } public void setDesc(String desc) {<!-- --> this.desc = desc; } @Override public String toString() {<!-- --> return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } /** * Generate simulated order data */ private List<OrderStep> buildOrders() {<!-- --> List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("create"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("create"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("Payment"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("create"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("Payment"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("Payment"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("Complete"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("Push"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("Complete"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("Complete"); orderList.add(orderDemo); return orderList; }
3.2.1 Order message reception
com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerA.java
:
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}", consumeMode = ConsumeMode.ORDERLY) public class OrderlyRocketMqListenerA implements RocketMQListener<String> {<!-- --> @Override public void onMessage(String s) {<!-- --> log.info("Sequential subscription - information received: {}", s); } }
com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerB.java
:
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}", consumeMode = ConsumeMode.ORDERLY) public class OrderlyRocketMqListenerB implements RocketMQListener<String> {<!-- --> @Override public void onMessage(String s) {<!-- --> log.info("Sequential subscription - information received: {}", s); } }
3.3 Delay message
3.3.1 Delay message sending
/** * rockertmq sends delayed messages * * @return */ @RequestMapping(value = "sendSchedule", method = RequestMethod. GET) public String sendSchedule() {<!-- --> Message msg = MessageBuilder.withPayload("Delayed message") .build(); rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + ":", msg, 20); log.info("Delay message - release time: {}", System.currentTimeMillis()); return "OK"; }
3.3.2 Delay message reception
com/gm/rocketmq/component/rocketmq/ScheduleRocketMqListener.java
:
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}") public class ScheduleRocketMqListener implements RocketMQListener<MessageExt> {<!-- --> @Override public void onMessage(MessageExt message) {<!-- --> String msg = "Content: " + new String(message.getBody()) + ", time: " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later"; log.info("Delay subscription - received information: {}", msg); log.info("Delay message - accept time: {}", System.currentTimeMillis()); } }
3.4 Sending transaction message
3.4.1 Transaction message sending
/** * rockertmq sends production-side transaction messages * * @return */ @RequestMapping(value = "sendTransaction", method = RequestMethod.GET) public String sendTransaction() {<!-- --> Message msg = MessageBuilder.withPayload("Transaction Message") .build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TopicConstants.TRANSACTION_ROCKETMQ_TOPIC_TEST + ":", msg, "custom parameter"); log.info("Transaction message - release result: {} = {}", result.getTransactionId(), result.getSendStatus()); return "OK"; }
com/gm/rocketmq/component/rocketmq/TransactionListenerImpl.java
:
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener {<!-- --> private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); // Transaction messages have three states, commit state, rollback state, and intermediate state: // RocketMQLocalTransactionState.COMMIT: Commits the transaction, which allows consumers to consume this message. // RocketMQLocalTransactionState.ROLLBACK: Rollback transaction, which means that the message will be deleted and not allowed to be consumed. // RocketMQLocalTransactionState.UNKNOWN: Intermediate state, which means that the message queue needs to be checked to determine the state. // executeLocalTransaction method to execute local transactions @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- --> String transactionId = msg.getHeaders().get("__transactionId__").toString(); log.info("Execute local transaction, transactionId: {}", transactionId); int value = transactionIndex. getAndIncrement(); int status = value % 3; localTrans.put(transactionId, status); log.info("Get custom parameters: {}", arg); return RocketMQLocalTransactionState.UNKNOWN; } // The checkLocalTransaction method is used to check the local transaction status and respond to the check request of the message queue @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {<!-- --> String transactionId = msg.getHeaders().get("__transactionId__").toString(); log.info("Check local transaction status, transactionId: {}", transactionId); Integer status = localTrans.get(transactionId); if (null != status) {<!-- --> switch (status) {<!-- --> case 0: return RocketMQLocalTransactionState.UNKNOWN; case 1: return RocketMQLocalTransactionState. COMMIT; case 2: return RocketMQLocalTransactionState. ROLLBACK; } } return RocketMQLocalTransactionState. COMMIT; } }
3.5 Pull mode consumption
3.5.1 Source code analysis
In rocketmq-spring-boot-starter
about the automatic configuration of Pull
mode consumption,
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
:
Among them, rocketmq.name-server
, rocketmq.pull-consumer.group
, rocketmq.pull-consumer.topic
are mandatory .
The rest of the configuration, org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.PullConsumer
:
It can be seen from the above that using
rocketmq-spring-boot-starter
to implement thePULL
mode only supports singleTopic
PULL
For consumption,@ExtRocketMQConsumerConfiguration
is required to consume multipleTopic
inPULL
mode.
3.5.2 Configuration files required for PULL consumption
The configuration file adds pull-consumer
related configuration, complete rocketmq/src/main/resources/application.yml
:
rocketmq: name-server: 192.168.0.30:9876 producer: group: @artifactId@-group send-message-timeout: 60000 # Send message timeout, unit: milliseconds. Default is 3000 retry-times-when-send-failed: 3 # When sending messages synchronously, the number of failed retries. Default is 2 times retry-times-when-send-async-failed: 2 # When sending messages asynchronously, the number of failed retries. Default is 2 times retry-next-server: false # When sending a message to the Broker, if the sending fails, whether to retry another Broker. The default is false access-key: RocketMQAdmin # Access Key secret-key: 1qaz@WSX # Secret Key enable-msg-trace: true # Whether to enable the message trace function, the default is true to enable customized-trace-topic: RMQ_SYS_TRACE_TOPIC # Customize the topic of the message track, the default is RMQ_SYS_TRACE_TOPIC consumer: access-key: RocketMQAdmin # Access Key secret-key: 1qaz@WSX # Secret Key pull-consumer: access-key: RocketMQAdmin # Access Key secret-key: 1qaz@WSX # Secret Key topic: PULL_ROCKETMQ_TOPIC_TEST group: PULL_ROCKETMQ_TOPIC_TEST_CONSUMER_GROUP
3.5.3 Message sending
/** * Send a message to the pull mode of the ockertmq consumer * * @return */ @RequestMapping(value = "sendPull", method = RequestMethod.GET) public String pull() {<!-- --> for (int i = 0; i < 10; i ++ ) {<!-- --> Message msg = MessageBuilder.withPayload("pull message" + i).build(); rocketMQTemplate.syncSend(TopicConstants.PULL_ROCKETMQ_TOPIC_TEST + ":", msg); } for (int i = 0; i < 10; i ++ ) {<!-- --> Message msg = MessageBuilder.withPayload("pull ext message" + i).build(); rocketMQTemplate.syncSend(TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + ":", msg); } return "OK"; }
3.5.4 Use of @ExtRocketMQConsumerConfiguration
This example uses
@ExtRocketMQConsumerConfiguration
to define consumption, declare consumptionTopic
and consumption group, or declare differentname-server
.
Use
@ExtRocketMQTemplateConfiguration
to define producers, declare differentname-server
or other specific attributes to define non-standardRocketMQTemplate
.
com/gm/rocketmq/component/rocketmq/ExtRocketMQTemplate.java
:
import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; /** * It can be used to define non-standard RocketMQTemplate with different name-server or other specific attributes. This example defines message Topic and consumer */ @ExtRocketMQConsumerConfiguration(group = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, topic = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST) public class ExtRocketMQTemplate extends RocketMQTemplate {<!-- --> }
3.5.5 PULL mode message reception
com/gm/rocketmq/component/rocketmq/PullConsumer.java
:
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component public class PullConsumer implements CommandLineRunner {<!-- --> @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private ExtRocketMQTemplate extRocketMQTemplate; @Override public void run(String... args) {<!-- --> while (true) {<!-- --> List<String> messages = rocketMQTemplate.receive(String.class, 5000); log.info("receive from rocketMQTemplate, messages={}", messages); messages = extRocketMQTemplate.receive(String.class, 5000); log.info("receive from extRocketMQTemplate, messages={}", messages); } } }
3.6 source code
Source code address: module rocketmq
in https://gitee.com/gm19900510/springboot-cloud-example.git.