Article directory
- Build a cluster
-
- start nameserver
- Start Broker
- Start the dashboard
- function test
-
- the code
- result
- Performance Testing
- tips
Build a cluster
First of all, you need to build a cluster of the latest version of RocketMQ. In order to facilitate testing functions, modify configuration, etc., you can build 1 nameserver and 1 master. The specific building method will not be elaborated, but briefly describe the steps:
Start nameserver
- Download the binary package: https://rocketmq.apache.org/download, upload and decompress
-
Create a nameserver configuration and start the nameserver:
If you need to modify the log storage path, you need to find the corresponding logback configuration file in the conf directory, and modify all path-related configuration items, and the same goes for the subsequent Broker.
nameserver.conf:
serverChannelMaxIdleTimeSeconds=120 listenPort=19876 serverSocketSndBufSize=65535 serverAsyncSemaphoreValue=64 serverCallbackExecutorThreads=0 rocketmqHome=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0 clusterTest=false serverSelectorThreads=5 useEpollNativeSelector=false orderMessageEnable=false serverPooledByteBufAllocatorEnable=true serverWorkerThreads=8 kvConfigPath=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store/kvConfig.json serverSocketRcvBufSize=65535 productEnvName=center serverOnewaySemaphoreValue=256 configStorePath=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/conf/nameserver.conf
Startup script:
nohup sh /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/bin/mqnamesrv -c /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/conf/nameserver.conf > /dev/null 2> & amp; 1 & amp;
Start Broker
-
Modify Broker configuration and start Broker:
broker.conf
brokerClusterName = 5-1-0-Cluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable=true autoCreateSubscriptionGroup=true maxTransferBytesOnMessageInDisk=65536 listenPort=3140 namesrvAddr=172.24.30.192:19876; rocketmqHome=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0 storePathConsumerQueue=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store/consumequeue brokerIP2=172.24.30.194 brokerIP1=172.24.30.194 aclEnable=false storePathRootDir=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store storePathCommitLog=/neworiental/rocketmq-5.1.0/rocketmq-5.1.0/store/commitlog
Startup script:
#!/bin/bash ./etc/profile PID=`ps -ef | grep 'rocketmq-5.1.0' | grep -v grep | awk '{print $2}'` if [[ "" != "$PID" ]]; then echo "killing rocketmq-5.1.0 : $PID" kill $PID the fi sleep 1 nohup sh /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/bin/mqbroker -c /neworiental/rocketmq-5.1.0/rocketmq-5.1.0/conf/broker.conf >/dev/null 2> & amp;1 & amp; echo "deploying rocketmq-5.1.0..."
Start dashboard
This step is processed on demand
Download address: https://github.com/apache/rocketmq-dashboard
After cloning, make a jar package, upload it, and write a startup script:
MAIN_JAR="-jar /neworiental/rocketmq-5.1.0/dashboard/rocketmq-console-ng-2.0.0.jar" JAVA_ARGS="-server -Xms4g -Xmx4g -XX:NewSize=512m -XX: + UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX: + PrintGCDetails -XX: + PrintHeapAtGC -XX:ThreadStackSize=512 -Xloggc:${LOGS_DIR} /gc.log" CONSOLE_ARGS="--server.port=18281 --rocketmq.config.loginRequired=false --rocketmq.config.namesrvAddr=172.24.30.192:19876" if [ ! -d ${LOGS_DIR} ] then mkdir -p ${LOGS_DIR} the fi echo ${JAVA_ARGS} ${MAIN_JAR} ${CONSOLE_ARGS} nohup java ${JAVA_ARGS} ${MAIN_JAR} ${CONSOLE_ARGS} >/dev/null 2> & amp;1 & amp; echo "deploying rocketmq-dashboard now ..."
After the construction is complete, open the dashboard
Functional testing
Code
To test the function of timing/delayed messages, you need to use the latest version of the client:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.1.0</version> </dependency>
Of course, the official integration of common language clients into an open source project, I don’t think I’m very used to it, pass. Project address: https://github.com/apache/rocketmq-clients If you are interested, you can try it.
Producer:
For the producer, there is no need to make too many changes, just add a delay or timing parameter to the message:
public static void main(String[] args) throws Exception {<!-- --> DefaultMQProducer producer = new DefaultMQProducer("test-producer", false); producer.setNamesrvAddr("172.24.30.192:19876"); producer.start(); for (int i = 0; i < 10; i ++ ) {<!-- --> Message msg = new Message(); msg.setTopic("delay-topic"); msg.setBody("This is a delayed message".getBytes(RemotingHelper.DEFAULT_CHARSET)); Duration messageDelayTime = Duration.ofSeconds(10); long delayTimestamp = System.currentTimeMillis() + messageDelayTime.toMillis(); // Absolute time: timing message msg.setDeliverTimeMs(delayTimestamp); // relative time: delayed message // msg.setDelayTimeSec(1000 * 5); SendResult sendResult = producer. send(msg); System.out.printf("Send time: %s %n", formatter.format(LocalDateTime.now())); } producer. shutdown(); }
Consumer:
The consumer is the same as consuming ordinary messages, no special settings are required:
public static void main(String[] args) throws Exception {<!-- --> DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-test-0317", false); //Set nameserver consumer.setNamesrvAddr("172.24.30.192:19876"); //Set topic, subExpression is to set the subscribed tag, * means all consumer.subscribe("delay-topic", "*"); //Pull from the latest offset consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); //register listener consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {<!-- --> System.out.printf("Receiving time: %s %n", formatter.format(LocalDateTime.now())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); }
Results
Producer:
consumer:
The error is on the order of hundreds of milliseconds.
Performance testing
The official pressure test code is provided, org.apache.rocketmq.example.benchmark.timer.TimerProducer
You need to download the latest version of the source code. The following is the TPS situation in the case of a single Broker, 100 threads, and a message size of 1kb:
If you don’t want to download the source code, you can use the following code to do a simple test and observe the TPS situation from the dashboard:
package cn.xdf.xadd.rmq.test.producer; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.time.Duration; import java.time.format.DateTimeFormatter; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestProducer {<!-- --> private static final String MSG = StringUtils. repeat('a', 1024); private static final String TOPIC = "delay-topic"; private static final Integer THREAD_COUNT = 100; private static final ExecutorService SEND_THREAD_POOL = new ThreadPoolExecutor( THREAD_COUNT, THREAD_COUNT, 0L, TimeUnit. MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("ProducerSendMessageThread_")); static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS"); static DefaultMQProducer producer = new DefaultMQProducer("test-producer", false); public static void main(String[] args) throws Exception {<!-- --> producer.setNamesrvAddr("172.24.30.192:19876"); producer.start(); benchmark(); } private static void benchmark() {<!-- --> for (int i = 0; i <thREAD_COUNT; i ++ ) {<!-- --> SEND_THREAD_POOL. execute(() -> {<!-- --> for (int j = 1; j <= 10000; j ++ ) {<!-- --> sendMsg(); } }); } } private static void sendMsg() {<!-- --> try {<!-- --> producer. send(buildMessage()); } catch (Exception e) {<!-- --> System.out.println(e); } } private static Message buildMessage() throws UnsupportedEncodingException {<!-- --> Message msg = new Message(); msg. setTopic(TOPIC); msg.setBody(MSG.getBytes(RemotingHelper.DEFAULT_CHARSET)); Duration messageDelayTime = Duration.ofHours(10); long delayTimestamp = System.currentTimeMillis() + messageDelayTime.toMillis(); msg.setDeliverTimeMs(delayTimestamp); return msg; } }
Tips
- By default, the maximum delay time supported is 3 days, which can be modified according to the broker configuration: timerMaxDelaySec;
- timerPrecisionMs is 1000 by default, changing it to a smaller value can improve the precision, it is recommended to use the default value;
- For messages that exceed the slot time of the time wheel, the slots will be reused by means of modulo, so delayed messages exceeding 7 days are supported;
- The backlog of messages will not cause OOM, and the time wheel data uses DirectByteBuffer in memory, which uses direct memory;
- Configuration item: timerEnableDisruptor is off by default. It stands to reason that turning it on can improve performance and reduce latency (try it, changing it to true does not take effect, it will be reset to false)