Message middleware – RocketMQ (comparison with Kafka and RabbitMQ)

Comparison of RocketMQ, Kafka, and RabbitMQ

  • 1.ActiveMQ: The older message middleware produced by Apache

  • 2.Kafka: supports log messages and monitoring data. It is a high-throughput distributed publish-subscribe message system that supports millions of stand-alone throughputs, but may cause data loss

  • 3.RocketMQ: After using Kafka, Ali found that its message system is mainly oriented to log transmission, and may cause data loss. For some core functions of Taobao, data loss is absolutely not allowed Yes, So RocketMQ was born based on Kafka, positioned at non-log reliable message transmission;

  • 4. RabbitMQ: The open source implementation of AMQP (Advanced Message Queuing Protocol) developed by Erlang language; it is an open standard of application layer protocol, designed for message-oriented middleware, and clients based on this protocol The terminal and message middleware can transmit messages, and are not limited by conditions such as products and development languages;

What is a message queue?

It is an important component in a distributed system. Messages refer to data transmitted between applications. Messages can be very simple, such as containing only text strings, or more complex, and may contain embedded objects

Messages are sent to queues, which are containers that hold messages during their transmission, and the message queue manager acts as a The middleman,The main purpose of the queue is to provide routing and ensure the delivery of messages, if the consumer is not available when sending the message, the message queue will keep the message until it can be delivered successfully

Message queuing is a way of communication between applications. Message sending can be returned immediately. There is a message system to ensure the reliable delivery of information. Message producers only publish messages to MQ, regardless of who gets them. No matter who released it, producers and consumers do not know who each other is;

Why use RocketMQ?

According to Ali’s research, with the increase of queues and virtual topics, the ActiveMQ IO module has reached a bottleneck. Ali tried his best to solve this problem by throttling, circuit breakers or downgrading, but the effect was not satisfactory. So Ali tried the popular messaging solution Kafka, but due to its high latency and low reliability problems, it could not meet the requirements

So RocketMQ was born, with simple architecture, rich business functions, strong scalability, high availability, high scalability, final consistency, etc. characteristics are widely used

Advantages of RocketMQ

Asynchronous decoupling (advertising flow update)

When there is no real-time data exchange requirement between systems, but other business information is needed, message queues can be used to achieve decoupling between systems. As long as the publisher defines the format of the message queue, the consumer Any operation has nothing to do with the publisher, reducing unnecessary joint debugging and publishing conflicts;

Cut peaks and fill valleys

In special scenarios, such as seckill, Spring Festival Gala red envelopes, etc. Under the pulse pressure of trillions of traffic, the message queue can protect the system from crashing

Through high-performance storage and processing capabilities, excess traffic exceeding the system processing capacity is temporarily stored, and gently released within the system processing capacity, so as to achieve the effect of peak shaving

Distributed cache synchronization

For example, after the payment operation is completed, the payment result is sent to the message topic specified by the SMS notification to make non-core operations asynchronous, thereby improving the efficiency and stability of the entire business link;

Core concepts

Producer

Responsible for producing messages, generally the business system is responsible for producing messages. A message producer will send messages generated in the business application system to the Broker server. RocketMQ provides multiple sending methods, Synchronous sending, asynchronous sending, sequential sending, single Send to

Both synchronous and asynchronous methods require Broker to return confirmation information, and one-way transmission does not require

topic: Indicates the topic of the message to be sent

tag: Indicates the tag of the message to be sent

Tips: Topic message topic classifies different business messages through Topic, and Tag message label is used to further distinguish the message classification under Topic. The message is sent from the producer with attributes, so Topic can be understood as a first-level classification , Tag is a secondary classification, and the relationship between Topic and Tag is as follows:

body: indicates the storage content of the message

properties: indicates message properties

keys: Each message can set a unique identification code keys field at the business level, which is convenient for locating message loss problems in the future. The Broker side will create an index (hash index) for each message. It should be possible to pass the topic, key to query the content of this message and who consumes the message. Since it is a hash index, it is necessary to ensure the uniqueness of the key to avoid hash conflicts

transactionId: will be used in transactions

MessageQueue queue: In order to support high concurrency and horizontal expansion, Topic needs to be partitioned. This is called a queue in RocketMQ. A Topic may have multiple queues and may be distributed on different Brokers

Generally speaking, if a message is not sent repeatedly, it will only exist in one of the queues of the Topic. Messages are stored in the queue according to the first-in-first-out principle, and each message will have its own location , Each queue will count the total number of current messages, called the maximum position MaxOffset, the position corresponding to the starting position of the queue is called the starting position MinOffset, the queue can improve the concurrency of message sending and consumption;

Send synchronously:

The next message will not be sent until a synchronous response from the server is received;

Send asynchronously:

It is necessary to implement the asynchronous sending callback interface SendCallback to process the response result;

Send in one-way mode:

Call sendOneway, but there will be no waiting and processing for the returned result;

Consumer

Responsible for consuming messages. Generally, the background system is responsible for asynchronous consumption. A message consumer will pull messages from the Broker server and provide them to the application. From the perspective of user applications, two forms of consumption are provided:Pull Push consumption (Push), push consumption (Pull)

If multiple consumers set the same Consumer Groupwe consider these consumers to be in the same consumer group;

Push consumption

Push consumption is the server actively pushes messages to the client. The advantage is good timeliness, but if the client does not do a good job of flow control, once the client pushes a large number of messages to the client, it will cause the client to End message accumulation or even collapse;

process:

1. Create a consumer group

2. Set Name Server

3. Subscribe to the specified Topic, and add message filter conditions

4. Register message listener (callback interface) and write consumption logic to process messages received from Broker

Pull consumption (basically not used)

Pull consumption is The client needs to take the initiative to fetch data from the server. The advantage is that the client can consume according to its own consumption capacity, but the frequency of pulling also needs to be controlled by the user. The pressure on the end and the client, the pull interval is long and it is easy to cause untimely consumption;

Cluster Mode

A message will only be sent to one consumer for consumption;

If the message mode is set in a consumer group, then as long as it is a consumer under the consumer group, it will be applied to the mode;

Therefore, the consumption mode is set at the group level;

consumer.setMessageModel(MessageModel.CLUSTERING);

Broadcast mode

It is used when the same message needs to be processed differently, and a message will be sent to all consumers in the consumer group;

consumer.setMessageModel(MessageModel.BROADCASTING);

Concurrent consumption

When registering a message listener, pass in the implementation of MessageListenerConcurrently to complete;

Sequential consumption

In concurrent consumption, there may be multiple threads consuming messages of a queue at the same time, so even if the sender guarantees that the messages are in the same FIFO order by sending sequential messages, it cannot guarantee that the messages will actually be consumed sequentially, so we provide sequential consumption

When registering a message listener, pass in the implementation of the MessageListenerConcurrently interface to complete;

Message filtering

When a message producer sends a message to a topic, it sets the message attribute to classify the message. When the consumer subscribes to the topic, it filters the message according to the filter condition set by the message attribute. Only the message that meets the condition will be filtered. Delivery to the consumer for consumption,

Tag filter:

If the Tag subscribed by the consumer matches the Tag set by the sender, the message will be delivered to the consumer for consumption

Scenario:Simple filtering scenario, a message supports setting a Tag, which can be used when only one level of classification and filtering is required for the messages in the Topic

SQL92 filter:

The sender sets the Tag or message attribute, and the consumer subscribes to Messages that satisfy the SQL92 filter expression are delivered to the consumer for consumption

Scenario:Complicated filtering scenarios, one message supports setting multiple attributes, and multiple types of expressions can be customized and combined according to SQL syntax to classify messages at multiple levels and achieve multi-dimensional filtering;

Message retry

After the consumer fails to consume the message, it needs to provide a retry mechanism to consume the message again. The failure of Consumer to consume messages can usually be considered as the following: due to the reasons of the message itself, such as deserialization failure, message data itself cannot be processed (such as phone charge recharge, the mobile phone number of the current message is canceled and cannot be recharged), etc. This kind of error usually needs to skip this message, and then consume other messages, and even if the failed message is retried immediately, 99% of it will not be successful, so it is best to provide a timing retry mechanism, that is, after 10 seconds Try again. Because the dependent downstream application service is unavailable, such as the db connection is unavailable, the external system network is unreachable, etc. When encountering this kind of error, even if the current failed message is skipped, an error will also be reported when consuming other messages. In this case, it is recommended to use sleep for 30s before consuming the next message, which can reduce the pressure on Broker to retry the message. RocketMQ will set up a retry queue with the Topic name “%RETRY% + consumerGroup” for each consumer group (it should be noted here that the retry queue of this Topic is set for the consumer group, not for each Topic ), which is used to temporarily store messages that cannot be consumed by the Consumer due to various exceptions. Considering that it takes some time to recover from the exception, multiple retry levels will be set for the retry queue, and each retry level has a corresponding redelivery delay. The more retries, the greater the delivery delay. RocketMQ’s processing of retry messages is first saved to the delay queue with the topic name “SCHEDULE_TOPIC_XXXX”, and the background timing task is delayed according to the corresponding time and then saved to the retry queue of “%RETRY% + consumerGroup”.

Dead letter queue

The dead letter queue is used to process messages that cannot be consumed normally. When a message fails to be consumed for the first time, the message queue will automatically retry the message; After reaching the maximum number of retries, if the consumption still fails, it means that the consumer cannot consume the message correctly under normal circumstances. At this time, the message queue will not discard the message immediately, but will send it to the special queue corresponding to the consumer. RocketMQ refers to messages that cannot be consumed under normal circumstances as Dead-Letter Messages, and calls special queues for storing dead-letter messages Dead-Letter Queues. In RocketMQ, the consumer instance can consume again by using the console to resend the messages in the dead letter queue

Timed message

Timed message (delay queue) means that after the message is sent to the broker, it will not be consumed immediately, and it will wait for a specific time to be delivered to the real topic. The broker has a configuration item messageDelayLevel, the default value is “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. A custom messageDelayLevel can be configured. Note that messageDelayLevel is an attribute of the broker and does not belong to a certain topic. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). level has the following three situations:

level == 0, the message is a non-delayed message

1<=Level<=maxLevel, the message is delayed for a specific time, for example, Level=1, delay 1s

Level> maxLevel, then Level==maxLevel, for example, level==20, delay 2h

Scheduled messages will be temporarily stored in a topic named SCHEDULE_TOPIC_XXXX, and stored in a specific queue according to delayTimeLevel, queueId = delayTimeLevel – 1, that is, a queue only stores messages with the same delay, ensuring that messages with the same transmission delay can be consumed sequentially. The broker will consume SCHEDULE_TOPIC_XXXX in a scheduled manner and write the message to the real topic.

It should be noted that the timed message will be counted when it is written for the first time and when it is scheduled to be written to the real topic, so the number of sending and tps will increase.

Name Service NameServer

The name service acts as a provider of routing messages, similar to a registry. Producers or consumers can use the name service to find the Broker IP list corresponding to each topic. Multiple Name Server instances form a cluster, but they are independent of each other and there is no information exchange

Proxy server BrokerServer

The message transfer role is responsible for storing and forwarding messages. The proxy server in the RocketMQ system is responsible for receiving and storing messages sent from producers, and at the same time preparing for pull requests from consumers. The proxy server also stores message-related Metadata, including consumer groups, consumption progress, topics, queue messages, etc.

Message Content Message

The physical carrier of information transmitted by the message system, the smallest unit of production and consumption data, each message must belong to a topic, each message in RocketMQ has a unique MessageID, and can carry keys with business identification;

SpringBoot integrates RocketMQ

Here we take sending asynchronous message as an example

import dependencies

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>

Producer side

1. Inject RocketMQTemplate

Create a Message class to implement the SendCallBack interface:

@Slf4j
public class MQMessage implements SendCallback {

    private String tag;
    private Object data;

    public MQMessage(String tag,Object data){
        this.tag = tag;
        this.data=data;
    }

    @Override
    public void onSuccess(SendResult sendResult) {
        log.info("[Create order] asynchronous order sent message successfully -----------target tag" + tag);
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("[Create order] message sending failed----------",throwable);
    }
}

Send an asynchronous message

 //Create an order
        OrderMessage message = new OrderMessage(time,seckillId,token,userInfo.getPhone());
        MQMessage sendCallBack = new MQMessage("Create order", message);
        rocketMQTemplate.asyncSend(MQConstant.ORDER_PEDDING, message,sendCallBack);

Consumer side

Note: It is not processed at the Controller layer, but at the listener level, and an additional listener class needs to be created.

1. Implement the RocketMQListener interface, the generic type is the type of message to be received, that is, OrderMessage;

2. Post the annotation @RocketMQMessageListener to specify the consumer group, subject and label:

@RocketMQMessageListener(
        consumerGroup = MQConstant.ORDER_PEDDING_CONSUMER_GROUP,
        topic = MQConstant.ORDER_PEDDING_TOPIC,
        selectorExpression = MQConstant.ORDER_PEDDING_TAG
)

Here I am all encapsulated in a class, I can show you that the relationship between the theme and the label needs to be marked with:

Finally, consume in the OnMessage method (business logic implementation on the consumer side):

 @Override
    public void onMessage(OrderMessage orderMessage) {
        log.info("[Create order....], the corresponding seckill product id is:" + orderMessage.getSeckillId());
        OrderMQResult message = new OrderMQResult(orderMessage.getTime(),orderMessage.getSeckillId(),null,orderMessage.getToken(),"order created successfully", Result.SUCCESS_CODE);
        MQMessage sendCallBack = new MQMessage("Create order", message);
        String topic = MQConstant.ORDER_RESULT_SUCCESS_DEST;
        try {
            String orderNo = orderInfoService.doSeckill(orderMessage.getUserPhone(), orderMessage.getSeckillId());
            // Order successfully created
            log.info("[Order created successfully ------]");
            message.setOrderNo(orderNo);

            // send delayed message
            rocketMQTemplate.asyncSend(MQConstant.ORDER_PAY_TIMEOUT, MessageBuilder.withPayload(message).build(),new MQMessage("delay message",message.getOrderNo() + ""),1000,9);

        } catch (Exception e) {
            e.printStackTrace();
            log.error("[Order creation failed ------]", e);
            message.setMsg(SeckillCodeMsg.SECKILL_ERROR.getMsg());
            message.setCode(SeckillCodeMsg.SECKILL_ERROR.getCode());
            topic = MQConstant.ORDER_RESULT_FAIL_DEST;
            // order creation failed
        } finally {
            rocketMQTemplate.asyncSend(topic, message, sendCallBack);
        }
    }

Start after changing the configuration: mqbroker -c ../conf/broker.conf