RocketMQ consumer operating principle, Consumer cluster consumption, broadcast consumption

  • Station B https://www.bilibili.com/video/BV1zm4y1872b
  • Online learning documents https://d9bp4nr5ye.feishu.cn/wiki/wikcnqTsEVMD74nV9W6IsmVHuxe

Last time we took a look at the consumption process of RocketMQ Consumer as a whole. RocketMQ Consumer, the analysis of consumer consumption principles, and today we will focus on how Consumers perform cluster consumption and broadcast consumption.

Conclusion first

  1. When consumers register, they have informed the server of their consumption model (cluster/broadcast)
  2. When consumers go to pull data, the server judges whether the message can be pulled

Let’s look at the problem again

  1. How do consumers register?
  2. How did the first request come about?
  3. How does rebalanceService do the load?
  4. Is there one thread pool for different consumers to finally execute tasks?

1. Start

For those who haven’t read the previous article, please read the previous article first. This article is an advanced version of the previous article. Consumer, consumer consumption principle analysis

Every class decorated with the @RocketMQMessageListener annotation will generate a DefaultRocketMQListenerContainer according to the configuration information. The consumption starts from the start() method of this container .

2. How consumers register to nameserver

Conclusion: registered by heartbeat and updated in real time

A series of timed tasks will be started in mQClientFactory.start(), one of which is a timed task

MQClientInstance.java

public void start() throws MQClientException {<!-- -->

    synchronized (this) {<!-- -->
        switch (this. serviceState) {<!-- -->
            case CREATE_JUST:
                //...
                
                // Start scheduled tasks
                this.startScheduledTask();
                
                //...
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

There are many timing tasks here, such as updating nameserver address, updating topic, updating offset, etc.

private void startScheduledTask() {<!-- -->
   
    //...
    
    // Send heartbeat regularly to update consumer information
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {<!-- -->

        @Override
        public void run() {<!-- -->
            try {<!-- -->
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {<!-- -->
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    //...
}

Simplified version Send heartbeat operation

public void sendHeartbeatToAllBrokerWithLock() {<!-- -->
    this. sendHeartbeatToAllBroker();
}



private void sendHeartbeatToAllBroker() {<!-- -->
    // Get information about sending heartbeat
    final HeartbeatData heartbeatData = this. prepareHeartbeatData();
    

    if (!this.brokerAddrTable.isEmpty()) {<!-- -->
        //...
        // send heartbeat
        int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
        //...
    }
}


private HeartbeatData prepareHeartbeatData() {<!-- -->
    HeartbeatData heartbeatData = new HeartbeatData();

    // clientID
    heartbeatData.setClientID(this.clientId);

   
    // Each consumer will be registered in consumerTable, which is a ConcurrentMap
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {<!-- -->
        MQConsumerInner impl = entry. getValue();
        if (impl != null) {<!-- -->
            // Get consumer information
            ConsumerData consumerData = new ConsumerData();
            consumerData.setGroupName(impl.groupName());
            consumerData.setConsumeType(impl.consumeType());
            consumerData.setMessageModel(impl.messageModel());
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
            consumerData.setUnitMode(impl.isUnitMode());

            heartbeatData.getConsumerDataSet().add(consumerData);
        }
    }

    //Producer
    for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {<!-- -->
        MQProducerInner impl = entry. getValue();
        if (impl != null) {<!-- -->
            ProducerData producerData = new ProducerData();
            producerData.setGroupName(entry.getKey());

            heartbeatData.getProducerDataSet().add(producerData);
        }
    }

    return heartbeatData;
}

DefaultMQPushConsumerImpl.java

The data source of consumerTable

public synchronized void start() throws MQClientException {<!-- -->
    switch (this. serviceState) {<!-- -->
        case CREATE_JUST:
          
            //...
            // register consumer
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
        
            //...
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                 + this. serviceState
                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
}

private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();

public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {<!-- -->
    if (null == group || null == consumer) {<!-- -->
        return false;
    }
    
    // The putIfAbsent method returns true if the value is set successfully, and returns the value corresponding to the current key if the key already exists
    MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
    if (prev != null) {<!-- -->
        log.warn("the consumer group[" + group + "] exist already.");
        return false;
    }

    return true;
}

3. How did the first request come?

In the previous article, we mentioned that each consumer will start an infinite loop and always fetch data from the queue for consumption. We also know that every time the task is completed, the current request will be stored in the queue to form a circular request, which will There is a question: how did the first request come from? It actually came from load balancing.

In fact, it comes from the load, and its load is: we know that topic is a logical classification, queues are the essence of storing data, and load is to switch between different queues to consume data. (Only for cluster consumption)

MQClientInstance.java

public void start() throws MQClientException {<!-- -->

    synchronized (this) {<!-- -->
        switch (this. serviceState) {<!-- -->
            case CREATE_JUST:
                //...
                
                // enable load balancing
                this.rebalanceService.start();
                
                //...
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

RebalanceService.java run()

An infinite loop is opened here, as long as the thread does not stop, it will continue to execute

@Override
public void run() {<!-- -->
    log.info(this.getServiceName() + "service started");

    while (!this.isStopped()) {<!-- -->
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + "service end");
}

MQClientInstance.java

Above we already know that every class that uses annotations will be registered as a DefaultMQPushConsumerImpl implements MQConsumerInner

public void doRebalance() {<!-- -->
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {<!-- -->
        MQConsumerInner impl = entry. getValue();
        if (impl != null) {<!-- -->
            try {<!-- -->
                impl.doRebalance();
            } catch (Throwable e) {<!-- -->
                log.error("doRebalance exception", e);
            }
        }
    }
}

DefaultMQPushConsumerImpl.java

@Override
public void doRebalance() {<!-- -->
    if (!this.pause) {<!-- -->
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}

RebalanceImpl.java
public void doRebalance(final boolean isOrder) {<!-- -->
    Map<String, SubscriptionData> subTable = this. getSubscriptionInner();
    if (subTable != null) {<!-- -->
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {<!-- -->
            final String topic = entry. getKey();
            try {<!-- -->
                this. rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {<!-- -->
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {<!-- -->
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    this.truncateMessageQueueNotMyTopic();
}

Here we can see that whether it is cluster consumption or broadcast consumption, it will obtain the queue information corresponding to the current topic, and then deliver it to the queue (the allocation strategy of cluster consumption is more complicated, so let’s focus on it first, and explain it later)

private void rebalanceByTopic(final String topic, final boolean isOrder) {<!-- -->
    switch (messageModel) {<!-- -->
        case BROADCASTING: {<!-- -->
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {<!-- -->
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {<!-- -->
                    this. messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                        consumerGroup,
                        topic,
                        mqSet,
                        mqSet);
                }
            } else {<!-- -->
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {<!-- -->
             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
             //...

            boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
            if (changed) {<!-- -->
                log. info(
                    "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                    strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                    allocateResultSet. size(), allocateResultSet);
                this. messageQueueChanged(topic, mqSet, allocateResultSet);
            }
          
            break;
        }
        default:
            break;
    }
}

This method will assemble each queue (MessageQueue) into a request (PullRequest)

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {<!-- -->
    boolean changed = false;

    //...
    
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {<!-- -->
        if (!this.processQueueTable.containsKey(mq)) {<!-- -->
            if (isOrder & amp; & amp; !this.lock(mq)) {<!-- -->
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }

            this. removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();

            long nextOffset = -1L;
            try {<!-- -->
                nextOffset = this.computePullFromWhereWithException(mq);
            } catch (Exception e) {<!-- -->
                log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
                continue;
            }

            if (nextOffset >= 0) {<!-- -->
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {<!-- -->
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {<!-- -->
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {<!-- -->
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    this.dispatchPullRequest(pullRequestList);

    return changed;
}

Call this method to add the request (PullRequest) to the queue, and the previous infinite loop will pull the request for processing

@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {<!-- -->
    for (PullRequest pullRequest : pullRequestList) {<!-- -->
        // As we have seen before, this method will add the task to the queue
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}

4. How does rebalanceService load

First, a topic message will be delivered to multiple message queues (here we assume three queues A, B, and C). The so-called load is that consumers consume from three (N) queues in sequence according to a certain strategy.

In the first request above, we already know the entrance of the load, let’s look directly at the core method of the load
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {<!-- -->
    switch (messageModel) {<!-- -->
        case BROADCASTING: {<!-- -->
             // ... broadcast payload
             
        }
        case CLUSTERING: {<!-- -->
            // ... cluster load
        
        }
    }
}

Broadcast consumption

case BROADCASTING: {<!-- -->
    // Get the queue information of the current topic
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    if (mqSet != null) {<!-- -->
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {<!-- -->
            this. messageQueueChanged(topic, mqSet, mqSet);
            log.info("messageQueueChanged {} {} {} {}",
                consumerGroup,
                topic,
                mqSet,
                mqSet);
        }
    } else {<!-- -->
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    }
    break;
}

For broadcast consumption, each consumer needs to consume every message in the topic, so there is no load.

updateProcessQueueTableInRebalance

The updateProcessQueueTableInRebalance method is used to process the load balancing results, and this method is universal. Because there is no load balancing for broadcast consumption, the result is processed directly.

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {<!-- -->
    boolean changed = false;
    //...
    
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {<!-- -->
        // Judging that there is no such queue in the current ongoing queue
        if (!this.processQueueTable.containsKey(mq)) {<!-- -->
            if (isOrder & amp; & amp; !this.lock(mq)) {<!-- -->
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
            
            // delete exception queue
            this. removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();

            long nextOffset = -1L;
            try {<!-- -->
                nextOffset = this.computePullFromWhereWithException(mq);
            } catch (Exception e) {<!-- -->
                log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
                continue;
            }

            if (nextOffset >= 0) {<!-- -->
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {<!-- -->
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {<!-- -->
                    // Assemble the message request. As mentioned earlier, there will be an infinite loop to continuously obtain request information from the queue
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {<!-- -->
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    // This method is to put these request information into the queue mentioned earlier
    this.dispatchPullRequest(pullRequestList);

    return changed;
}

Cluster consumption

Cluster consumption requires load balancing because each message is only guaranteed to be consumed by a certain consumer in the consumer group.

The load balancing of cluster consumption is to obtain all the queues, and all consumers under the current topic and consumerGroup, and then distribute them according to a certain strategy.

case CLUSTERING: {<!-- -->
    // Get all queues under the current topic
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    
    // Get all consumers under the current topic and consumerGroup (will initiate a request to the server to obtain)
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    
    // ... omit parameter validation
      
    if (mqSet != null & amp; & amp; cidAll != null) {<!-- -->
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
        mqAll. addAll(mqSet);

        Collections. sort(mqAll);
        Collections. sort(cidAll);
        
        // Obtain the load balancing strategy. The default is [AllocateMessageQueueAveragely] average hash queue algorithm
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
        List<MessageQueue> allocateResult = null;
        try {<!-- -->
            allocateResult = strategy. allocate(
                this.consumerGroup,
                this.mQClientFactory.getClientId(),
                mqAll,
                cidAll);
        } catch (Throwable e) {<!-- -->
            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                e);
            return;
        }

        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
        if (allocateResult != null) {<!-- -->
            allocateResultSet.addAll(allocateResult);
        }
        
        //Practically process the results returned by the strategy [I have already said above]
        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        if (changed) {<!-- -->
            log. info(
                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                allocateResultSet. size(), allocateResultSet);
            this. messageQueueChanged(topic, mqSet, allocateResultSet);
        }
    }
    break;
}

AllocateMessageQueueAveragely

Presumably you are as curious as I am about how this default strategy came about? ? ?

The default strategy is obtained from the current object, which is RebalanceImpl AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

Looking up, it is the DefaultMQPushConsumerImpl object that holds the RebalanceImpl object

@Override
public void doRebalance() {<!-- -->
    if (!this.pause) {<!-- -->
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}

So we need to find out when creating the DefaultMQPushConsumerImpl object, its creation is at the beginning of parsing @RocketMQMessageListener, you can read the previous article, here is just to show the final result

private void initRocketMQPushConsumer() throws MQClientException {<!-- -->
    //...
    
    // Regardless of which of the following creation methods the default value of the load balancing strategy is [AllocateMessageQueueAveragely]
    if (Objects. nonNull(rpcHook)) {<!-- -->
        consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
            enableMsgTrace, this. applicationContext. getEnvironment().
            resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        consumer.setVipChannelEnabled(false);
    } else {<!-- -->
        log.debug("Access-key or secret-key not configure in " + this + ".");
        consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
            this. applicationContext. getEnvironment().
                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
    }
    
    //...
}

5. Is there one thread pool for different consumers to execute tasks?

The reason for this question is that if each consumer creates its own thread pool, isn’t there a lot of thread pools? (In fact, how to achieve high performance if there are not many threads)

DefaultMQPushConsumerImpl#start
The starting point for creating a thread pool is the start method, and the code snippet is as follows:

public synchronized void start() throws MQClientException {<!-- -->

    //...
    
    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {<!-- -->
        this.consumeOrderly = true;
        this.consumeMessageService =
            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this. getMessageListenerInner());
    } else if (this. getMessageListenerInner() instanceof MessageListenerConcurrently) {<!-- -->
        this. consumeOrderly = false;
        this.consumeMessageService =
            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this. getMessageListenerInner());
    }
    this.consumeMessageService.start();
    
    //...
}

Concurrent consumption is ConsumeMessageConcurrentlyService

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
    MessageListenerConcurrently messageListener) {<!-- -->
    //...
    // always the thread pool created by new
    this.consumeExecutor = new ThreadPoolExecutor(
        this.defaultMQPushConsumer.getConsumeThreadMin(),
        this.defaultMQPushConsumer.getConsumeThreadMax(),
        1000 * 60,
        TimeUnit. MILLISECONDS,
        this.consumeRequestQueue,
        new ThreadFactoryImpl(consumeThreadPrefix));

   //...
}

It is basically determined that it is a separate thread pool created by new. If you want to continue to confirm, you can interrupt and look at the address of the object (it turns out that it is different)

references

  1. https://cloud.tencent.com/developer/article/2045909