How to handle rocketMq message accumulation

Message accumulation is common in the following situations:

(1) The newly launched consumer function has a bug and messages cannot be consumed.

(2) The consumer instance is down or temporarily unable to establish a connection with the Broker due to network problems.

(3) Producers push a large number of messages to Broker in a short period of time, and consumers have insufficient consumption power.

(4) The producer is not aware of the Broker’s consumption accumulation and continues to push messages to the Broker.

Processing method:

1. Expanding consumers:
Increasing the number of consumers can increase the speed of message consumption, thereby reducing message accumulation. You can increase the number of consumers according to actual conditions to ensure that consumers can process messages in a timely manner.

2. Adjust consumer configuration:
Check the consumer’s configuration parameters, such as the number of consumer threads, consumption batch size, etc., to ensure that it can make full use of system resources and improve message consumption efficiency.

4. Adjust the number of message queues:
Depending on the accumulation of messages, you can consider increasing the number of message queues. By adding a message queue, you can improve the concurrent processing capabilities of messages and speed up the consumption of messages.

5. Increase the concurrency of message consumption:
If the consumer’s processing logic allows parallel processing of messages, the concurrency of message consumption can be increased. By increasing concurrency, message processing speed can be increased and message accumulation can be reduced.

6. Optimize consumer code

Adjust the code logic to reduce complexity, or store it in redis or database first and process it later.

7. Expand the RocketMQ cluster:
If the above methods cannot solve the problem of message accumulation, you can consider expanding the RocketMQ cluster to increase message processing capabilities and storage capacity.

After the RocketMQ version of the Cloud Message Queue message is sent to the Broker node, the client configured with the Group ID pulls some messages from the Broker node and consumes them locally based on the current consumption location. Under normal circumstances, the process of the client pulling messages from the Broker node will not lead to message accumulation. The main reason is that during the client’s local consumption process, the client’s consumption capacity is insufficient due to reasons such as too long consumption or low consumption concurrency. , there is a problem of message accumulation

Solution

If messages accumulate, you can refer to the following measures to locate and handle them.

  1. Determine whether messages are accumulated on the Cloud Message Queue RocketMQ version server or client.

    Check the client’s local log file ons.log and search for the following information:

    the cached message count exceeds the threshold
    • Relevant log information appears, indicating that the client’s local buffer queue is full and messages are accumulated on the client.

    • If no relevant logs appear, it means that the message accumulation is not on the client. If this special situation occurs, please contact Alibaba Cloud technical support directly.

  2. Confirm whether the message consumption time is reasonable.

    • If you see that consumption takes a long time, you need to check the client stack information to troubleshoot the specific business logic.

    • If you see that the consumption time is normal, it may be due to insufficient consumption concurrency resulting in the accumulation of messages. This needs to be solved by gradually increasing the consumption thread or expanding the node capacity.

    The consumption time of messages can be viewed in the following ways:

    • Log in to the Cloud Message Queue RocketMQ console to view the message consumption track. In the Consumer area, you can see the Consumption takes time. For specific operations, see Querying Message Tracks.

      Consumption time

    • Log in to the Cloud Message Queue RocketMQ version console to view the consumer status, view the business processing time in the client connection information, and obtain the average consumption time. For specific operations, see Viewing Consumer Status.

      Consumption status

    • The consumption time of using Alibaba Cloud ARMS and other monitoring products to collect information from business points.

  3. View client stack information. You only need to pay attention to the thread named ConsumeMessageThread. These are the logic of business consumption messages. You can refer to the official Java documentation to determine the status of the thread and modify the business logic according to specific problems.

    Client stack information can be obtained in the following ways:

    • Log in to the Cloud Message Queue RocketMQ console to view the consumer status, and view Java client stack information in the client connection information. For specific operations, see Viewing Consumer Status.

    • Use the Jstack tool to print stack information.

      1. See Viewing Consumer Status to obtain the host IP address corresponding to the consumer instance where messages are accumulated, and log in to the host.

      2. Execute any of the following commands to view and record the PID of the Java process.

        ps -ef
        |grep javajps -lm
      3. Execute the following command to view stack information.

        jstack -l pid > /tmp/pid.jstack
      4. Execute the following command to view the information of ConsumeMessageThread.

        cat /tmp/pid.jstack|grep ConsumeMessageThread -A 10 --color

    Common exception stack information is as follows:

    • Example 1: Free and non-accumulated stack.

      When consumption is idle, the consumption thread will be in the WAITING state, waiting to obtain messages from the consumption task queue.

      Stack example one

    • Example 2: The consumption logic includes lock grabbing, sleep waiting, etc.

      The consuming thread is blocked in an internal sleep wait, resulting in slow consumption.

      Stack example two

    • Example 3: The consumption logic operation database and other external storage are stuck.

      The consuming thread is blocked on external HTTP calls, resulting in slow consumption.

      Stack Example 3

  4. For some special business scenarios, if the accumulation of messages has affected business operations, and the accumulated messages themselves can be skipped and not consumed, you can skip these accumulated messages by resetting the consumption point and start consuming them from the latest point for quick recovery. business. For specific operations, see Resetting consumption points.