[RocketMQ Series 5] Message Example-Implementation of Sequential Messages & Delayed Messages & Broadcast Messages

1. Preface

In the previous article, we introduced the implementation of simple messages. This article will mainly introduce the implementation of sequential messages. Sequential messages are divided into local sequential messages and global sequential messages.

Sequential messages mean that when consumers consume messages, they consume them in the order in which the producers send them. That is, the first sent first consumes [FIFO].

Sequential messages are divided into global sequential messages and local sequential messages.

Global sequential messages use a queue globally.

Partial sequential messages are messages with sequential dependencies placed in the same queue, and multiple queues consume them in parallel.

2. Local sequential messages

By default, RocketMQ will send messages to a queue in a broker based on polling, so there is no guarantee that messages are in order.

For example, in the scenario of placing an order on a shopping website: there are four messages: 1. Create order —-> 2. Order payment —-> 3. Order shipped —-> 4. Order completed. These four messages must be logically ordered. However, if RocketMQ’s default message delivery method is used, the same order may be created and delivered to MessageQueue1, and the order payment may be delivered to MessageQueue2. Since the messages are in different MessageQueue, the order payment message may appear before the order creation message when the consumer consumes.

The partial sequence message is to ensure that the four messages of the same order are placed in the same queue, so that the order payment message will not be consumed before the order creation message. Just like the picture below:

Local sequential message consumers consume messages in a queue of a topic sequentially. Consumers use the MessageListenerOrderly class to listen for messages.

2.1. Define producers

  1. A topic named part_order_topic_test is defined here. After running the program, the topic can be routed to two brokers, broker-a and broker-b.

    image-20231003154231683

public class OrderProducer {<!-- -->
// Partial sequential consumption, the core is to select the Queue yourself to ensure that messages that require sequence guarantee fall into the same queue
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {<!-- -->
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");
defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
defaultMQProducer.start();

for (int i = 0; i < 10; i + + ) {<!-- -->
int orderId = i;
for (int j = 0; j < 5; j + + ) {<!-- -->
// Construct the message body, tags and keys are just a simple distinction
Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("Part order message processing_" + orderId + ";step_" + j) .getBytes());
SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {<!-- -->
@Override
//The arg parameter here is the external orderId
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {<!-- -->
Integer orderId = (Integer) arg;
int index = orderId % mqs.size();
return mqs.get(index);
}
}, orderId);

System.out.printf("%s%n", send);
}
}
defaultMQProducer.shutdown();
}
}
  1. Implement the MessageQueueSelector interface when sending messages to specify the queue when sending messages. Among them, the public MessageQueue select(List mqs, Message msg, Object arg) method has three parameters: among them, mqs represents the number of all queues routed by the current topic, here are 8 queues, broker -a has 4 queues, broker-b has 4 queues. msg is the incoming message body, and arg is the incoming orderId.

  2. Here, the modulus of the orderId and the number of queues is used to obtain which queue the message should be sent to. This ensures that messages with the same orderId will fall into the same queue

    Integer orderId = (Integer) arg;
    int index = orderId % mqs.size();
    return mqs.get(index);
    
Producer operation results (partial screenshots)

image-20231003160039915

It can be seen from the running results that messages with the same orderId are delivered to the same MessageQueue, and the same MessageQueue queues are naturally ordered.

2.2. Define consumers

After talking about producers, let’s talk about consumers. The consumer logic mainly needs to implement the MessageListenerOrderly class to monitor messages when consuming. The core code is:

 // 2. Subscribe to consume messages
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {<!-- -->
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {<!-- -->
for (MessageExt msg : msgs) {<!-- -->
System.out.println("The message obtained by consumption is ={}" + msg);
System.out.println("Message body content is={}" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

Three consumers are started here. Regardless of the order in which the consumers consume, the five messages under the same orderId are consumed sequentially.

image-20231010125014684

image-20231010125043838

image-20231010125110337

3. Problems encountered

During the first debugging, a broker is full error occurred. This is caused by insufficient disk space. You can use the df -h command to check the current disk space usage. When the disk space usage exceeds 90%, this error will be reported.

image-20231003131237358

4. Global sequence message

Global sequential messaging means that all messages consumed by consumers are sequential. This can only be achieved by sending all messages to the same MessageQueue, which will greatly affect efficiency in high concurrency scenarios.

5. Broadcast message

Broadcast messages are sent to all subscribers of a topic. Multiple consumers subscribing to the same topic can receive all messages sent by the producer.

The producer implementation of broadcast messages is consistent with the producer implementation of ordinary synchronization messages. The difference is that the message mode of the consumer is different. Here are the differences in consumer implementations.

 DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");
defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
//Set the consumer mode to broadcast mode
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);

//Start consuming from the first place
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6. Delay message

Delayed messages differ from ordinary messages in that they are not delivered until a specified amount of time has elapsed. The producer does not delay sending the message, but sends it to the topic, and the consumer delays consumption for a specified time.

6.1. Delayed message producer

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");
defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");
defaultMQProducer.start();
for (int i = 0; i < 100; i + + ) {<!-- -->
Message message = new Message("Schedule_topic", ("Delayed Message Test" + i).getBytes());
//Set the delay level. There are 18 delay levels by default. This message will be consumed with a delay of 10 seconds.
message.setDelayTimeLevel(3);
defaultMQProducer.send(message);
}
System.out.println("All delayed messages are sent completed");
defaultMQProducer.shutdown();

The main difference between delayed message producers and ordinary message producers is that delayed messages need to call the setDelayTimeLevel method to set the delay level. The setting level here is 3, which means a delay of 10 seconds. RocketMQ provides 18 latency levels. This can be found in the broker configuration in the cluster in RocketMQ’s dashboard.

image-20231003200021490

Consumers of delayed messages are the same as consumers of ordinary messages. RocketMQ internally stores delayed messages through a topic named SCHEDULE_TOPIC_XXXX.

image-20231003201410410

7. Batch messages

Sending messages in batches improves the performance of message delivery. It is officially recommended that the total size of batch messages should not exceed 1M, but in fact it should not exceed 4M. If the batch message exceeds 4M, it needs to be processed in batches. At the same time, set the broker’s configuration parameters to 4M (modify in the broker’s configuration file: maxMessageSize=4194304). The core code is as follows:

 //4. Create message
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 100*100; i + + ) {<!-- -->
//Create a message, specify topic, and message body
messageList.add(new Message("batch_topic", ("Feige test batch message" + i).getBytes()));
}
//Processing of batch messages with messages less than 4M
SendResult send = defaultMQProducer.send(messageList);
System.out.println(send);

8. Filter messages

Use tag filter

In most cases, labels are a simple and useful design for selecting the message you want.

The first is to filter messages based on tags. The producer specifies the tag of the message when sending the message, and the consumer can filter the messages based on the tag.

8.1. Filter message producers

Three tags are defined here, namely tagA, tagB and tagC. The producer assigns different tags to each message when producing messages.

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");
defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
defaultMQProducer.start();
String[] tags = new String[]{<!-- -->"tagA", "tagB", "tagC"};
for (int i = 0; i < 15; i + + ) {<!-- -->
Message message = new Message("TagFilterTest", tags[i % tags.length], ("Feige tag message filtering" + tags[i % tags.length]).getBytes());
SendResult send = defaultMQProducer.send(message);
System.out.printf("%s%n", send);
}
defaultMQProducer.shutdown();

8.2. Filtering message consumers

The consumer filters out the messages with tagA and tagC for consumption. Here, the broker actually pushes the messages that the consumer needs to the consumer.

 DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");
defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {<!-- -->
for (MessageExt msg : msgs) {<!-- -->
System.out.println("Received message=" + msg);
System.out.println("Received message body=" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
System.out.println("The consumer has started");

image-20231003212919155

Use SQL filter

The SQL function can perform some calculations through the attributes entered when sending the message. Under the syntax defined by RocketMQ, some interesting logic can be implemented.

Grammar

RocketMQ only defines some basic syntax classes to support this feature.

1. Numerical comparison: such as `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. Character comparison: such as `=`,'<>',`IN`;
3. `IS NULL` or `IS NOT NULL`;
4. Logical `AND`, `OR`, `NOT`;

The constant types are:

1. Number, such as 123,
2. Characters, such as 'abc', must be enclosed in single quotes;
3. `NULL`, special constant;
4. Boolean value, `TRUE` or `FALSE`;

SQL filter producer

The producer mainly sets property filtering message.putUserProperty("a", String.valueOf(i)); which means the first message key-value pair is a=0 and the second message key-value Yes a=1.

 DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");
defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
defaultMQProducer.start();
String[] tags = new String[]{<!-- -->"tagA", "tagB", "tagC"};
for (int i = 0; i < 15; i + + ) {<!-- -->
Message message = new Message("SQLFilterTest", tags[i % tags.length], ("Feige sql message filtering" + tags[i % tags.length]).getBytes());

message.putUserProperty("a", String.valueOf(i));
SendResult send = defaultMQProducer.send(message);
System.out.printf("%s%n", send);
}
defaultMQProducer.shutdown();

SQL filter consumer:

 DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");
defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))" + " and (a is null and a between 0 and 3)"));
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {<!-- -->
for (MessageExt msg : msgs) {<!-- -->
System.out.println("Received message=" + msg);
System.out.println("Received message body=" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
System.out.println("The consumer has started");

If the operation reports The broker does not support consumer to filter message by SQL92

image-20231003221207618

You need to modify the broker.conf file and add the following configuration:

# Enable support for propertyfilter
enablePropertyFilter = true
filterSupportRetry = true

Then restart the broker.

Summary

This article introduces local sequential messages, global sequential messages, broadcast messages, delayed messages, and how to send messages in batches and filter messages.