Why does RocketMQ ensure that the subscription relationship is consistent?

In this article, the author wants to talk about one of RocketMQ’s best practices: ensuring consistent subscription relationships.

The consistent subscription relationship means that the Topic and Tag subscribed by all Consumer instances under the same consumer Group ID must be exactly the same.

If the subscription relationship is inconsistent, the logic of message consumption will be confused and even lead to message loss.

1 Subscription relationship demonstration

First, we show the correct subscription relationship: multiple Group IDs subscribe to multiple Topics, and the subscription relationships of multiple consumers in each Group ID remain consistent.

Next, we show the wrong subscription relationship.

From the above figure, a single Group ID subscribes to multiple Topics, but the subscription relationships of multiple consumers in the Group ID are not consistent.

From a code logic perspective, the topic, TAG, and listening logic of the subscription method in each consumer instance need to be consistent.

Next, we experiment with the same consumer group and two incorrect scenarios to see if there are any abnormalities in the consumer and broker services.

Subscription topics are different but the tags are the same

The subscription topics are the same but the labels are different

2 Subscription topics are different but the tags are the same


When we start two consumers, the consumer group name is: myconsumerGroup. C1 consumer subscribes to the topic TopicTest, and C2 consumer subscribes to the topic mytest.

In the log on the Broker side, logs of failure to pull messages will be printed continuously:

2023-10-09 14:52:53 WARN PullMessageThread_2 –

the consumer’s subscription not exist, group: myconsumerGroup, topic:TopicTest

In this case, it is impossible for the C1 consumer to obtain the message, and it is impossible to consume the latest news.

why ? We know that the client will regularly send heartbeat packets to the Broker service. The heartbeat packet will contain consumer subscription information. A sample data format is as follows:

"subscriptionDataSet": [

 {

 "classFilterMode": false,

 "codeSet": [],

 "expressionType": "TAG",

 "subString": "*",

 "subVersion": 1696832107020,

 "tagsSet": [],

 "topic": "TopicTest"

 },

 {

 "classFilterMode": false,

 "codeSet": [],

 "expressionType": "TAG",

 "subString": "*",

 "subVersion": 1696832098221,

 "tagsSet": [],

 "topic": "%RETRY%myconsumerGroup"

 }

]

The Broker service will call the heartBeat method of ClientManageProcessor to handle the heartbeat request.

The code was finally traced to: org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer

The Broker service will save consumer information, which is stored in the consumer table consumerTable. The consumer table uses the consumer group name as key and the value as consumer group information ConsumerGroupInfo.

#org.apache.rocketmq.broker.client.ConsumerManager

private final ConcurrentMap consumerTable =

new ConcurrentHashMap(1024);

If the consumer group’s consumer information ConsumerGroupInfo is empty, create a new object.

When updating subscription information, the subscription information is stored according to consumer groups. This step will cause the subscription information of each consumer client in the same consumer group to be overwritten.

Back to the consumer client, when the consumer pulls the message, the Broker service will call the processRequest method of PullMessageProcessor.

First, a pre-judgment will be performed to query the subscription information of the current topic. If the subscription information of the topic is empty, an alarm log will be printed and an abnormal response result will be returned.

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());

if (null == subscriptionData) {

 log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(),

 response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);

 response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));

 return response;

}

By investigating the code on the Broker side, we found that the subscription information of the same consumer group must be consistent, otherwise the subscription information of each consumer client in the same consumer group will overwrite each other, resulting in a consumer client being unable to pull new information. news.

The C1 consumer cannot consume the message data of the topic TopicTest. If the C2 consumer subscribes to the topic mytest, will the consumption be normal?

Judging from the picture above, there is still a problem. The topic mytest has four queues, but only two queues are allocated. The messages in the other two queues cannot be consumed.

To explain this problem, we need to review the principles of Load Balancing.

The load balancing service will perform different logical processing depending on whether the consumption mode is “broadcast mode” or “cluster mode”. Here we mainly look at the main processing flow in cluster mode:

(1) Get the message consumption queue collection under this topic;

(2) Query the Broker to obtain the list of consumer IDs under the consumer group;

(3) First sort the message consumption queue and consumer ID under the Topic, and then use the message queue allocation strategy algorithm (the default is: the average allocation algorithm of the message queue) to calculate the message queue to be pulled;

The average allocation algorithm here is similar to the paging algorithm. It sorts all MessageQueues like records, sorts all consumers like the number of pages, and finds the average size that each page needs to contain and the number of records on each page. range, and finally traverse the entire range to calculate the records that should be allocated to the current consumer.

(4) Perform a filtering comparison operation between the assigned message queue set and processQueueTable.

Within the consumer instance, the processQueueTable object stores the current load balancing queue and the queue’s processing queue processQueue (consumption snapshot).

  1. The Entry part marked in red means that it is not included in the assigned message queue collection. You need to set the Dropped attribute of these red queues to true and then remove them from the processQueueTable object.
  2. The green Entry part represents the intersection with the assigned message queue collection, which already exists in the processQueueTable object.
  3. The yellow Entry part indicates that these queues need to be added to the processQueueTable object. A message pull request pullRequest is created for each allocated new queue. A processing queue processQueue (queue consumption snapshot) is saved in the message pull request. The interior is red and black. Tree (TreeMap), used to save the pulled messages.

Finally, create a pull message request list, distribute the request to the message pull service, and enter the message pull stage.

Through the above introduction and derivation of the principle of load balancing, the reason is obvious.

The C1 consumer is assigned queue 0 and queue 1, but the C1 consumer itself does not subscribe to the topic mytest, so it cannot consume data from this topic.

Judging from this experiment, the C1 consumer cannot consume the message data of the topic TopicTest, and the C2 consumer can only partially consume the message data of the topic mytest.

However, because on the Broker side, the subscription information of each consumer client in the same consumer group is overwritten by each other, this consumption state is very confusing, and occasionally switches to: C1 consumer can partially consume the message data of the topic TopicTest, C2 The consumer cannot consume the message data of the topic mytest.

3 The subscription topics are the same but the tags are different

As shown in the figure, C1 consumer and C2 consumer subscribe to the topic TopicTest, but their tags are different.

After starting the consumer service, observing from the console, the load balancing effect is as expected.

The author printed the tracking log on the Broker side and found that the subscription information of the topic TopicTest is:

{

 "classFilterMode": false,

 "codeSet": [66],

 "expressionType": "TAG",

 "subString": "B",

 "subVersion": 1696901014319,

 "tagsSet": ["B"],

 "topic": "TopicTest"

}

So is consumption normal in this state? The author conducted a set of experiments, and consumption is still chaotic:

Consumer C1 cannot consume messages with TAG value A, and consumer C2 can only consume some messages with TAG value B.

To understand why, we need to sort out the message filtering mechanism.

First, the format of the ConsumeQueue file is as follows:

  1. After receiving the pull request, the Broker locates the ConsumeQueue file according to the request parameters, then traverses the entries to be retrieved in the ConsumeQueue, and determines whether the hashcode of the Tag stored in the entry is the same as the hashcode of the TAG in the subscription information. If not, skip it. , continue to compare with the next one, and return the aggregation that meets the conditions to the consumer client.
  2. After receiving the filtered message, the consumer must also execute the filtering mechanism, except that the value of the TAG string is filtered, not the hashcode.

Let’s simulate the message filtering process:

First, the producer sends different messages to the Broker, and messages with different TAGs are sent to different saved queues.

When the C1 consumer pulls messages from queue 0 and queue 1, because the TAG value in the subscription information of the topic on the Broker side is B, after filtering by the server, the TAG value of the message pulled by the C1 consumer is all B. However, after the consumer receives the filtered message, it also needs to perform client filtering. A is not equal to B, so the C1 consumer cannot consume messages with a TAG value of A.

The C2 consumer pulls messages from queue 2 and queue 3. The entire logical link is normal, but due to load balancing, it cannot consume messages from queue 0 and queue 1.

4 Summary

What is a consumer group? Consume the same type of messages and the consumption logic is consistent. RocketMQ 4.X source code is implemented to be consistent with the definition of consumer groups.

There are two ways to avoid the problem of inconsistent subscription relationships:

  • Properly define topics and tags

After we define the topic and tags, when we need to add new tags, can we change the idea: change a new consumer group or create a new topic.

  • Strictly standardize the online process

Before going online, sort out the relevant dependent services, sort out the online process, do a good online review, and strictly follow the process.

Final Thoughts:

If we think from the infrastructure level, it should be possible to centralize the subscription relationship information, but the cost is high and it is not cost-effective for small and medium-sized enterprises.