[RocketMQ] Timing message practice based on RocketMQ version 5.1.0

Article directory

  • Build a cluster
    • start nameserver
    • Start Broker
    • Start the dashboard
  • function test
    • the code
    • result
  • Performance Testing
  • tips

Build a cluster

First of all, you need to build a cluster of the latest version of RocketMQ. In order to facilitate testing functions, modify configuration, etc., you can build 1 nameserver and 1 master. The specific building method will not be elaborated, but briefly describe the steps:

Start nameserver

  • Download the binary package: https://rocketmq.apache.org/download, upload and decompress

  • Create a nameserver configuration and start the nameserver:

    If you need to modify the log storage path, you need to find the corresponding logback configuration file in the conf directory, and modify all path-related configuration items, and the same goes for the subsequent Broker.

    nameserver.conf:

    serverChannelMaxIdleTimeSeconds=120
    listenPort=19876
    serverSocketSndBufSize=65535
    serverAsyncSemaphoreValue=64
    serverCallbackExecutorThreads=0
    rocketmqHome=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0
    clusterTest=false
    serverSelectorThreads=5
    useEpollNativeSelector=false
    orderMessageEnable=false
    serverPooledByteBufAllocatorEnable=true
    serverWorkerThreads=8
    kvConfigPath=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store/kvConfig.json
    serverSocketRcvBufSize=65535
    productEnvName=center
    serverOnewaySemaphoreValue=256
    configStorePath=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/conf/nameserver.conf
    

    Startup script:

    nohup sh /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/bin/mqnamesrv -c /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/conf/nameserver.conf > /dev/null 2> & amp; 1 & amp;
    

Start Broker

  • Modify Broker configuration and start Broker:

    broker.conf

    brokerClusterName = 5-1-0-Cluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    maxTransferBytesOnMessageInDisk=65536
    listenPort=3140
    namesrvAddr=172.24.30.192:19876;
    rocketmqHome=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0
    storePathConsumerQueue=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store/consumequeue
    brokerIP2=172.24.30.194
    brokerIP1=172.24.30.194
    aclEnable=false
    storePathRootDir=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store
    storePathCommitLog=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store/commitlog
    

    Startup script:

    #!/bin/bash
    ./etc/profile
    
    PID=`ps -ef | grep 'rocketmq-5.1.0' | grep -v grep | awk '{print $2}'`
    if [[ "" != "$PID" ]]; then
      echo "killing rocketmq-5.1.0 : $PID"
      kill $PID
    the fi
    
    sleep 1
    
    nohup sh /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/bin/mqbroker -c /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/conf/broker.conf >/dev/null 2> & amp;1 & amp;
    echo "deploying rocketmq-5.1.0..."
    

Start dashboard

This step is processed on demand

Download address: https://github.com/apache/rocketmq-dashboard

After cloning, make a jar package, upload it, and write a startup script:

MAIN_JAR="-jar /neworiental/rocketmq-5.1.0/dashboard/rocketmq-console-ng-2.0.0.jar"
JAVA_ARGS="-server -Xms4g -Xmx4g -XX:NewSize=512m -XX: + UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX: + PrintGCDetails -XX: + PrintHeapAtGC -XX:ThreadStackSize=512 -Xloggc:${LOGS_DIR} /gc.log"

CONSOLE_ARGS="--server.port=18281 --rocketmq.config.loginRequired=false --rocketmq.config.namesrvAddr=172.24.30.192:19876"

if [ ! -d ${LOGS_DIR} ]
then
  mkdir -p ${LOGS_DIR}
the fi

echo ${JAVA_ARGS} ${MAIN_JAR} ${CONSOLE_ARGS}
nohup java ${JAVA_ARGS} ${MAIN_JAR} ${CONSOLE_ARGS} >/dev/null 2> & amp;1 & amp;
echo "deploying rocketmq-dashboard now ..."

After the construction is complete, open the dashboard

Functional testing

Code

To test the function of timing/delayed messages, you need to use the latest version of the client:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.1.0</version>
        </dependency>

Of course, the official integration of common language clients into an open source project, I don’t think I’m very used to it, pass. Project address: https://github.com/apache/rocketmq-clients If you are interested, you can try it.

Producer:

For the producer, there is no need to make too many changes, just add a delay or timing parameter to the message:

 public static void main(String[] args) throws Exception {<!-- -->
        DefaultMQProducer producer = new DefaultMQProducer("test-producer", false);
        producer.setNamesrvAddr("172.24.30.192:19876");
        producer.start();
        for (int i = 0; i < 10; i ++ ) {<!-- -->
            Message msg = new Message();
            msg.setTopic("delay-topic");
            msg.setBody("This is a delayed message".getBytes(RemotingHelper.DEFAULT_CHARSET));
            Duration messageDelayTime = Duration.ofSeconds(10);
            long delayTimestamp = System.currentTimeMillis() + messageDelayTime.toMillis();
            // Absolute time: timing message
            msg.setDeliverTimeMs(delayTimestamp);
            // relative time: delayed message
// msg.setDelayTimeSec(1000 * 5);
            SendResult sendResult = producer. send(msg);
            System.out.printf("Send time: %s %n", formatter.format(LocalDateTime.now()));
        }
        producer. shutdown();
    }

Consumer:

The consumer is the same as consuming ordinary messages, no special settings are required:

 public static void main(String[] args) throws Exception {<!-- -->
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-test-0317", false);
        //Set nameserver
        consumer.setNamesrvAddr("172.24.30.192:19876");
        //Set topic, subExpression is to set the subscribed tag, * means all
        consumer.subscribe("delay-topic", "*");
        //Pull from the latest offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //register listener
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {<!-- -->
            System.out.printf("Receiving time: %s %n", formatter.format(LocalDateTime.now()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

Results

Producer:

consumer:

The error is on the order of hundreds of milliseconds.

Performance testing

The official pressure test code is provided, org.apache.rocketmq.example.benchmark.timer.TimerProducer

You need to download the latest version of the source code. The following is the TPS situation in the case of a single Broker, 100 threads, and a message size of 1kb:

If you don’t want to download the source code, you can use the following code to do a simple test and observe the TPS situation from the dashboard:

package cn.xdf.xadd.rmq.test.producer;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestProducer {<!-- -->

    private static final String MSG = StringUtils. repeat('a', 1024);
    private static final String TOPIC = "delay-topic";
    private static final Integer THREAD_COUNT = 100;
    private static final ExecutorService SEND_THREAD_POOL = new ThreadPoolExecutor(
            THREAD_COUNT,
            THREAD_COUNT,
            0L,
            TimeUnit. MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new ThreadFactoryImpl("ProducerSendMessageThread_"));

    static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");

    static DefaultMQProducer producer = new DefaultMQProducer("test-producer", false);


    public static void main(String[] args) throws Exception {<!-- -->
        producer.setNamesrvAddr("172.24.30.192:19876");
        producer.start();
        benchmark();
    }

    private static void benchmark() {<!-- -->
        for (int i = 0; i <thREAD_COUNT; i ++ ) {<!-- -->
            SEND_THREAD_POOL. execute(() -> {<!-- -->
                for (int j = 1; j <= 10000; j ++ ) {<!-- -->
                    sendMsg();
                }
            });
        }
    }

    private static void sendMsg() {<!-- -->
        try {<!-- -->
            producer. send(buildMessage());
        } catch (Exception e) {<!-- -->
            System.out.println(e);
        }
    }

    private static Message buildMessage() throws UnsupportedEncodingException {<!-- -->
        Message msg = new Message();
        msg. setTopic(TOPIC);
        msg.setBody(MSG.getBytes(RemotingHelper.DEFAULT_CHARSET));
        Duration messageDelayTime = Duration.ofHours(10);
        long delayTimestamp = System.currentTimeMillis() + messageDelayTime.toMillis();
        msg.setDeliverTimeMs(delayTimestamp);
        return msg;
    }

}

Tips

  1. By default, the maximum delay time supported is 3 days, which can be modified according to the broker configuration: timerMaxDelaySec;
  2. timerPrecisionMs is 1000 by default, changing it to a smaller value can improve the precision, it is recommended to use the default value;
  3. For messages that exceed the slot time of the time wheel, the slots will be reused by means of modulo, so delayed messages exceeding 7 days are supported;
  4. The backlog of messages will not cause OOM, and the time wheel data uses DirectByteBuffer in memory, which uses direct memory;
  5. Configuration item: timerEnableDisruptor is off by default. It stands to reason that turning it on can improve performance and reduce latency (try it, changing it to true does not take effect, it will be reset to false)