Spring Boot integrates RocketMQ to realize normal, delay, transaction message sending and receiving, PULL consumption mode and enable ACL | Spring Cloud 30

1. Foreword

In the previous section, we had a basic understanding of RocketMQ through the following chapters:

docker-compose builds RocketMQ 5.1.0 cluster (dual master and dual slave mode) | Spring Cloud 28

docker-compose build RocketMQ 5.1.0 cluster to enable ACL permission control | Spring Cloud 29

Now let’s formally study the integration of RocketMQ in Spring Boot. In this chapter, we will mainly explain the following parts:

  • Sending and receiving of ordinary messages
  • Delayed sending and receiving of messages
  • Sending and receiving transactional messages
  • The sender and receiver enable ACL
  • PULL mode consumption and @ExtRocketMQConsumerConfiguration use

2. Project integration RocketMQ

2.1 Overall project structure

2.2 Import dependencies

rocketmq/pom.xml:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2.3 Configuration file

rocketmq/src/main/resources/application.yml:

server:
  port: 8888

spring:
  application:
    name: @artifactId@

rocketmq:
  name-server: 192.168.0.30:9876
  producer:
    group: @artifactId@-group
    send-message-timeout: 60000 # Send message timeout, unit: milliseconds. Default is 3000
    retry-times-when-send-failed: 3 # When sending messages synchronously, the number of failed retries. Default is 2 times
    retry-times-when-send-async-failed: 2 # When sending messages asynchronously, the number of failed retries. Default is 2 times
    retry-next-server: false # When sending a message to the Broker, if the sending fails, whether to retry another Broker. The default is false
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
    enable-msg-trace: true # Whether to enable the message trace function, the default is true to enable
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # Customize the topic of the message track, the default is RMQ_SYS_TRACE_TOPIC
  consumer:
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key

record:
  level:
    org:
      springframework:
        boot:
          autoconfigure:
            logging: info

2.4 Theme and consumer group constants

com/gm/rocketmq/component/rocketmq/TopicConstants.java:

package com.gm.rocketmq.component.rocketmq;

/**
 * theme constant
 */
public interface TopicConstants {<!-- -->

    String NORMAL_ROCKETMQ_TOPIC_TEST= "NORMAL_ROCKETMQ_TOPIC_TEST";

    String ORDERLY_ROCKETMQ_TOPIC_TEST= "ORDERLY_ROCKETMQ_TOPIC_TEST";

    String SCHEDULE_ROCKETMQ_TOPIC_TEST= "SCHEDULE_ROCKETMQ_TOPIC_TEST";

    String TRANSACTION_ROCKETMQ_TOPIC_TEST= "TRANSACTION_ROCKETMQ_TOPIC_TEST";

    String PULL_ROCKETMQ_TOPIC_TEST= "PULL_ROCKETMQ_TOPIC_TEST";

    String EXT_ROCKETMQ_TOPIC_TEST= "EXT_ROCKETMQ_TOPIC_TEST";

    String CONSUMER_GROUP = "_CONSUMER_GROUP";
}

3. Various types of message sending and receiving

3.1 General message

3.1.1 Common message sending

 @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * Send synchronous and asynchronous messages to rocketmq
     */
    @RequestMapping(value = "sendNormal", method = RequestMethod. GET)
    public String sendNormal() {<!-- -->
        rocketMQTemplate.send(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":sync", MessageBuilder.withPayload("Send messages synchronously").build());
        rocketMQTemplate.asyncSend(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":async", MessageBuilder.withPayload("Send message asynchronously").build(), new SendCallback() {<!-- -->

            @Override
            public void onSuccess(SendResult sendResult) {<!-- -->
                log.info("Asynchronous send successfully: {}", sendResult.getSendStatus().name());
            }

            @Override
            public void onException(Throwable throwable) {<!-- -->
                log.info("Asynchronous sending failed: {}", throwable.getMessage());
            }
        });
        return "OK";
    }

3.1.2 Common message reception

com/gm/rocketmq/component/rocketmq/NormalRocketMqListener.java:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-cret-key}", secret$conrock-key = "m ")
public class NormalRocketMqListener implements RocketMQListener<String> {<!-- -->
    @Override
    public void onMessage(String s) {<!-- -->
        log.info("Ordinary subscription - information received: {}", s);
    }
}

@RocketMQMessageListener annotation parameter description:

  • consumerGroup: consumer subscription group, it is required and must be unique.
  • topic: topic name, the topic name sent by production.
  • consumeMode: consumption mode, you can choose to receive messages concurrently or sequentially; by default, CONCURRENTLY receives messages delivered asynchronously at the same time.
  • messageModel: message mode, the default CLUSTERING cluster consumption; if you want all subscribers to receive messages, you can set broadcast BROADCASTING.
  • consumeThreadMax: The maximum number of consumer threads, the default is 64.
  • consumeTimeout: The maximum time for message blocking, the default is 15 minutes.
  • nameServer: server address, the address of the configuration file is read by default, and the specified location can be set separately for consumers.
  • selectorExpression: Consume the business message of the specified Tag.
  • The ACL function on the Consumer side needs to be configured in @RocketMQMessageListener
  • The Producer side ACL function needs to be configured in the configuration file
  • See more official explanations

3.2 Sequential messages

3.2.1 Sequential message sending

 /**
     * Send sequential messages to rockertmq, synchronously
     *
     * @return
     */
    @RequestMapping(value = "sendOrderlySync", method = RequestMethod.GET)
    public String sendOrderlySync() {<!-- -->
        // Order List
        List<OrderStep> orderList = buildOrders();
        for (int i = 0; i < 10; i ++ ) {<!-- -->
            Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();
            String orderId = String. valueOf(orderList. get(i). getOrderId());
            rocketMQTemplate.sendOneWayOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":sync", msg, orderId);
        }
        return "OK";
    }

    /**
     * rockertmq sends sequential messages, asynchronously
     *
     * @return
     */
    @RequestMapping(value = "sendOrderlyAsync", method = RequestMethod.GET)
    public String sendOrderlyAsync() {<!-- -->
        // Order List
        List<OrderStep> orderList = buildOrders();
        for (int i = 0; i < 10; i ++ ) {<!-- -->
            Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();
            String orderId = String. valueOf(orderList. get(i). getOrderId());
            rocketMQTemplate.syncSendOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":async", msg, orderId);
        }
        return "OK";
    }
    
   /**
     * Order steps
     */
    private static class OrderStep {<!-- -->
        private long orderId;
        private String desc;

        public long getOrderId() {<!-- -->
            return orderId;
        }

        public void setOrderId(long orderId) {<!-- -->
            this. orderId = orderId;
        }

        public String getDesc() {<!-- -->
            return desc;
        }

        public void setDesc(String desc) {<!-- -->
            this.desc = desc;
        }

        @Override
        public String toString() {<!-- -->
            return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}';
        }
    }

    /**
     * Generate simulated order data
     */
    private List<OrderStep> buildOrders() {<!-- -->
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Push");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        return orderList;
    }

3.2.1 Order message reception

com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerA.java:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,
        consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",
        consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerA implements RocketMQListener<String> {<!-- -->
    @Override
    public void onMessage(String s) {<!-- -->
        log.info("Sequential subscription - information received: {}", s);
    }
}

com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerB.java:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,
        consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",
        consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerB implements RocketMQListener<String> {<!-- -->
    @Override
    public void onMessage(String s) {<!-- -->
        log.info("Sequential subscription - information received: {}", s);
    }
}

3.3 Delay message

3.3.1 Delay message sending

 /**
     * rockertmq sends delayed messages
     *
     * @return
     */
    @RequestMapping(value = "sendSchedule", method = RequestMethod. GET)
    public String sendSchedule() {<!-- -->
        Message msg = MessageBuilder.withPayload("Delayed message")
                .build();
        rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + ":", msg, 20);
        log.info("Delay message - release time: {}", System.currentTimeMillis());
        return "OK";
    }

3.3.2 Delay message reception

com/gm/rocketmq/component/rocketmq/ScheduleRocketMqListener.java:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST,
        consumerGroup = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class ScheduleRocketMqListener implements RocketMQListener<MessageExt> {<!-- -->
    @Override
    public void onMessage(MessageExt message) {<!-- -->
        String msg = "Content: " + new String(message.getBody()) + ", time: " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later";
        log.info("Delay subscription - received information: {}", msg);
        log.info("Delay message - accept time: {}", System.currentTimeMillis());
    }
}

3.4 Sending transaction message

3.4.1 Transaction message sending

 /**
     * rockertmq sends production-side transaction messages
     *
     * @return
     */
    @RequestMapping(value = "sendTransaction", method = RequestMethod.GET)
    public String sendTransaction() {<!-- -->
        Message msg = MessageBuilder.withPayload("Transaction Message")
                .build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TopicConstants.TRANSACTION_ROCKETMQ_TOPIC_TEST + ":", msg, "custom parameter");
        log.info("Transaction message - release result: {} = {}", result.getTransactionId(), result.getSendStatus());
        return "OK";
    }

com/gm/rocketmq/component/rocketmq/TransactionListenerImpl.java:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {<!-- -->
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    // Transaction messages have three states, commit state, rollback state, and intermediate state:

    // RocketMQLocalTransactionState.COMMIT: Commits the transaction, which allows consumers to consume this message.
    // RocketMQLocalTransactionState.ROLLBACK: Rollback transaction, which means that the message will be deleted and not allowed to be consumed.
    // RocketMQLocalTransactionState.UNKNOWN: Intermediate state, which means that the message queue needs to be checked to determine the state.

    // executeLocalTransaction method to execute local transactions
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {<!-- -->
        String transactionId = msg.getHeaders().get("__transactionId__").toString();
        log.info("Execute local transaction, transactionId: {}", transactionId);

        int value = transactionIndex. getAndIncrement();
        int status = value % 3;
        localTrans.put(transactionId, status);
        log.info("Get custom parameters: {}", arg);
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    // The checkLocalTransaction method is used to check the local transaction status and respond to the check request of the message queue
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {<!-- -->
        String transactionId = msg.getHeaders().get("__transactionId__").toString();
        log.info("Check local transaction status, transactionId: {}", transactionId);
        Integer status = localTrans.get(transactionId);
        if (null != status) {<!-- -->
            switch (status) {<!-- -->
                case 0:
                    return RocketMQLocalTransactionState.UNKNOWN;
                case 1:
                    return RocketMQLocalTransactionState. COMMIT;
                case 2:
                    return RocketMQLocalTransactionState. ROLLBACK;
            }
        }
        return RocketMQLocalTransactionState. COMMIT;
    }
}

3.5 Pull mode consumption

3.5.1 Source code analysis

In rocketmq-spring-boot-starter about the automatic configuration of Pull mode consumption,
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration:


Among them, rocketmq.name-server, rocketmq.pull-consumer.group, rocketmq.pull-consumer.topic are mandatory .

The rest of the configuration, org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.PullConsumer:

It can be seen from the above that using rocketmq-spring-boot-starter to implement the PULL mode only supports single Topic PULL For consumption, @ExtRocketMQConsumerConfiguration is required to consume multiple Topic in PULL mode.

3.5.2 Configuration files required for PULL consumption

The configuration file adds pull-consumer related configuration, complete rocketmq/src/main/resources/application.yml:

rocketmq:
  name-server: 192.168.0.30:9876
  producer:
    group: @artifactId@-group
    send-message-timeout: 60000 # Send message timeout, unit: milliseconds. Default is 3000
    retry-times-when-send-failed: 3 # When sending messages synchronously, the number of failed retries. Default is 2 times
    retry-times-when-send-async-failed: 2 # When sending messages asynchronously, the number of failed retries. Default is 2 times
    retry-next-server: false # When sending a message to the Broker, if the sending fails, whether to retry another Broker. The default is false
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
    enable-msg-trace: true # Whether to enable the message trace function, the default is true to enable
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # Customize the topic of the message track, the default is RMQ_SYS_TRACE_TOPIC
  consumer:
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
  pull-consumer:
    access-key: RocketMQAdmin # Access Key
    secret-key: 1qaz@WSX # Secret Key
    topic: PULL_ROCKETMQ_TOPIC_TEST
    group: PULL_ROCKETMQ_TOPIC_TEST_CONSUMER_GROUP

3.5.3 Message sending

 /**
     * Send a message to the pull mode of the ockertmq consumer
     *
     * @return
     */
    @RequestMapping(value = "sendPull", method = RequestMethod.GET)
    public String pull() {<!-- -->
        for (int i = 0; i < 10; i ++ ) {<!-- -->
            Message msg = MessageBuilder.withPayload("pull message" + i).build();
            rocketMQTemplate.syncSend(TopicConstants.PULL_ROCKETMQ_TOPIC_TEST + ":", msg);
        }
        for (int i = 0; i < 10; i ++ ) {<!-- -->
            Message msg = MessageBuilder.withPayload("pull ext message" + i).build();
            rocketMQTemplate.syncSend(TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + ":", msg);
        }
        return "OK";
    }

3.5.4 Use of @ExtRocketMQConsumerConfiguration

This example uses @ExtRocketMQConsumerConfiguration to define consumption, declare consumption Topic and consumption group, or declare different name-server.

Use @ExtRocketMQTemplateConfiguration to define producers, declare different name-server or other specific attributes to define non-standard RocketMQTemplate.

com/gm/rocketmq/component/rocketmq/ExtRocketMQTemplate.java:

import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

/**
 * It can be used to define non-standard RocketMQTemplate with different name-server or other specific attributes. This example defines message Topic and consumer
 */
@ExtRocketMQConsumerConfiguration(group = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
        topic = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST)
public class ExtRocketMQTemplate extends RocketMQTemplate {<!-- -->
}

3.5.5 PULL mode message reception

com/gm/rocketmq/component/rocketmq/PullConsumer.java:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;

@Slf4j
@Component
public class PullConsumer implements CommandLineRunner {<!-- -->

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Autowired
    private ExtRocketMQTemplate extRocketMQTemplate;

    @Override
    public void run(String... args) {<!-- -->
        while (true) {<!-- -->

            List<String> messages = rocketMQTemplate.receive(String.class, 5000);
            log.info("receive from rocketMQTemplate, messages={}", messages);


            messages = extRocketMQTemplate.receive(String.class, 5000);
            log.info("receive from extRocketMQTemplate, messages={}", messages);
        }
    }
}

3.6 source code

Source code address: module rocketmq in https://gitee.com/gm19900510/springboot-cloud-example.git.