Simple analysis of the use and principle of RocketMQ sql92 with source code

This is weihubeats. If you think the article is good, you can follow the official account Xiaozhuo Technology. The article will be published first. Reject marketing accounts and clickbait

RocketMQ version

  • 5.1.0

RokcetMQ message filtering

There are currently two officially supported message filtering methods:

  • tag
  • sql92

We can prove it by viewing the source code of ExpressionType

The tag filtering method is currently the most commonly used filtering method, but a message can only contain one tag.

For relatively complex message filtering scenarios, the tag filtering method may not be enough, but in most business scenarios, the tag filtering method is sufficient.

The sql92 filtering method can help us implement some advanced functions, such as RocketMQ‘s multi-test environment message isolation, etc.

We won’t discuss the specific usage scenarios of sql92 for now. Let’s learn how to use sql92 first.

sql92 Grammar rules

Syntax Explanation Example
IS NULL Judgment attribute does not exist. a IS NULL: Attribute a does not exist.
IS NOT NULL Determine if the attribute exists. a IS NOT NULL: Attribute a exists.
> >= < <= is used to compare numbers, but cannot be used for comparison String, otherwise the consumer client will report an error when starting. Note Strings that can be converted to numbers are also considered numbers. a IS NOT NULL AND a > 100: Attribute a exists and the value of attribute a is greater than 100. a IS NOT NULL AND a > abc’: Error example, abc is a string and cannot be used to compare sizes.
BETWEEN xxx AND xxx Used to compare numbers, cannot be used to compare strings , otherwise the consumer client will report an error when starting. Equivalent to >= xxx AND <= xxx. Indicates that the attribute value is between two numbers. a IS NOT NULL AND (a BETWEEN 10 AND 100): Attribute a exists and the value of attribute a is greater than or equal to 10 and less than or equal to 100.
NOT BETWEEN xxx AND xxx Used to compare numbers, not characters string, otherwise an error will be reported when the consumer client starts. Equivalent to < xxx OR > xxx, indicating that the attribute value is outside the range of the two values. a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): Attribute a exists and the value of attribute a is less than 10 or greater than 100.
IN (xxx, xxx) Indicates that the value of the attribute is within a certain set . The elements of the collection can only be strings. a IS NOT NULL AND (a IN (abc’, def’)): Attribute a exists and the value of attribute a is abc or def.
= <> Equal and not equal. Can be used to compare numbers and strings. a IS NOT NULL AND (a = ‘abc’ OR a<>‘def’): Attribute a exists and the value of attribute a is abc or the value of a is not def.
AND OR Logical AND, logical OR. It can be used to combine any simple logical judgments, and the content of each logical judgment needs to be placed in brackets. a IS NOT NULL AND (a > 100) OR (b IS NULL): Attribute a exists and the value of attribute a is greater than 100 or attribute b does not exist.

Since SQL attribute filtering is where the producer defines the message attributes and the consumer sets the SQL filtering conditions, the calculation results of the filtering conditions are uncertain. The server-side processing method is as follows:

  • Exception handling: If the expression calculation of the filter condition throws an exception, the message will be filtered by default and will not be delivered to the consumer. For example, comparing numeric and non-numeric values.

  • Handling of null value situations: If the expression evaluation value of the filter condition is null or is not of Boolean type (true and false), the message will be filtered by default and will not be delivered to the consumer. For example, if a certain attribute is not defined when sending a message and is used directly in the filter condition when subscribing, the expression of the filter condition will evaluate to null.

  • Numerical type incompatibility processing: If the message custom attribute is floating point, but the filter condition uses an integer for judgment, the message will be filtered by default and will not be delivered to the consumer.

sql92Use

Source code

All source code has been uploaded to github

  • Address: https://github.com/weihubeats/weihubeats_demos/tree/master/java-demos/rocketmq-demo/src/main/java/com/weihubeats/rocketmq/demo/sql92

Message sending

public class SQLProducer {<!-- -->

public static int count = 10;

public static String topic = "xiao-zou-topic";


public static void main(String[] args) {<!-- -->
DefaultMQProducer producer = MQUtils.createLocalProducer();
\t\t
IntStream.range(0, count).forEach(i -> {<!-- -->
Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));
try {<!-- -->
if (i % 2 == 0) {<!-- -->
message.putUserProperty("gray", "dev1");
}
SendResult sendResult = producer.send(message);
DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
}
catch (Exception e) {<!-- -->
throw new RuntimeException(e);
}
});
producer.shutdown();
\t\t
}
}

Here we pretend that the message is to send multiple test messages, so each message adds a dev1 tag to UserProperty.

What we want to achieve is that only consumers in the dev1 environment will consume messages with the dev1 tag, and other messages will be discarded.

Message consumption

public class SQLConsumer {<!-- -->

public static String GID = "xiao-zou-gid";


public static void main(String[] args) throws Exception {<!-- -->
DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);
String sql = "gray is not null and gray = 'dev1'";
consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {<!-- -->
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");

}
}

The only difference in the way of message consumption here is that the way we subscribe to messages has changed.

In the ordinary method, we call this method to subscribe to the message. Just pass in the tag.

For example like this

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

But here we use the sql92 method

What is passed in is a MessageSelector, and the subscription rule is

String sql = "gray is not null and gray = 'dev1'";

Operating effect

  • message sending

Here we sent ten messages, only 5 of which were tagged with gray

  • Message consumption

It can be seen that message consumption only consumes 5 messages with the gray tag, which is in line with our expectations.

Is sql92 filtered on the client side or on the server side?

sql92 and tag are both filtered on the server side. We can check the source code to find out.

However, the tag filtering method will be filtered again on the client side. Because filtering is done through hashcode on the server side, in order to improve performance, the original tag is not filtered. After filtering out the absolute value through hashcode After a large number of messages, the final tag is completely filtered on the client.

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult

If all filtering is performed on the client side, a large number of messages will be transmitted to the client, affecting performance.

Summary

This time we briefly used RocketMQ sql92 filtering messages and did a small amount of source code analysis. We did not completely analyze the entire process because this article is not biased in source code analysis. SQL92 is relatively rarely used in actual projects. Occasionally, if you do a multi-volume environment or grayscale of RocketMQ messages, it may be a solution, but it is not the best.

Reference

  • Official documentation