Message queue RocketMQ message repeated consumption problem (causes and solutions)

Table of Contents

1. Reasons for repeated consumption

2. Solve

2.1 Database insertion method

2.2 Using Bloom filters

2.2.1 Add hutool dependencies

2.2.2 Test producer

2.2.2 Test consumers


1. Reasons for repeated consumption

  1. In BROADCASTING (broadcasting) mode, all registered consumers will consume, and these consumers are usually microservices deployed in the cluster, so that multiple machines will consume repeatedly , of course, this is selected according to needs.
  2. In CLUSTERING (load balancing) mode, if a topic is consumed by multiple consumerGroups, it will also be consumed repeatedly.
  3. Even in CLUSTERING mode, under the same consumerGroup, a queue will only be assigned to one consumer, and it seems that there will be no repeated consumption. However, there is a special case: after a consumer comes online, all consumers in the same group need to reload balance and rebalance reBalance (and conversely, the same applies after a consumer goes offline). The new consumer corresponding to a queue needs to obtain the offset of the previous consumption (offset, that is, the point at which the message is consumed). At this time, the previous consumer may have already consumed a message. message, but the offset is not submitted to the broker, then the new consumer may consume it again. Although the orderly mode is a mode where the previous consumer unlocks first, and the next consumer locks and then consumes, it is stricter than concurrently, but the thread that locks the lock and the thread that submits the offset are not the same, so extreme situations will still occur. of repeated consumption.
  4. When sending a batch message, it will be processed as one message. So if one business in the batch message is successfully processed and the others fail, it will still be processed again. Consume once.

simply put:

  1. After the Consumer consumes the message, it is not synchronized to the Broker in real time. Instead, the offset is saved in the local map and persisted through the scheduled task. This causes the message to be consumed, but at this time the consumer is down and the offset is not submitted. The part of the message that has not submitted the offset will be consumed again next time.
  2. Even if the offset is submitted to the Broker, the Broker crashes before it can be persisted. When restarted, the Broker will read the offset information saved in consumerOffset.json , which will cause this part of the message without persistent offset to be consumed again

So what should we do if we are in CLUSTERING (load balancing) mode and in the same consumer group and do not want a message to be consumed repeatedly? We can think of the deduplication operation and find the unique identifier of the message, which can be msgId or your customized unique key, so that we can deduplicate it.

2. Solution

We need to implement Impotent for our consumers, that is, the processing result of the same message will not change no matter how many times it is executed.

Idempotence: The impact of multiple operations is the same as the impact of the first operation

For example: Determine the idempotence of crud

a. New addition: Ordinary new addition is non-idempotent, while new addition with unique index is set as idempotent operation.

b. Modification: update goods set stock = 10 where id = 1 idempotent

update goods set stock = stock – 1 where id = 1 non-idempotent

c. Query: Idempotent

d. Delete: idempotent

So how to achieve idempotence for business? This still needs to be combined with specific businesses. You can guarantee this by writing to Redis, because Redis’s keys and values naturally support idempotence. Of course, you can also use the Database Insertion Method to ensure that duplicate data will not be inserted multiple times based on the unique key of the database.

2.1 Database insertion method

The sender needs to bring a unique tag to the message (according to the business identifier)

Simulation business database order operation log table structure (duplication table)

Add a unique index to the order number (the order number is stored as a key)

Simulating business, the producer sent duplicate messages

@Test
public void repeatTest() throws Exception {
String key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder.withPayload("Deduction Inventory -1").setHeader(RocketMQHeaders.KEYS, key).build();
rocketMQTemplate.syncSend("repeatTopic", msg);
rocketMQTemplate.syncSend("repeatTopic", msg);
}

consumer

@Component
@RocketMQMessageListener(topic = "repeatTopic",consumerGroup = "repeat-consumer-group")
public class RepeatListener implements RocketMQListener<MessageExt> {
    @Autowired
    private LogMapper logMapper;
    @Override
    public void onMessage(MessageExt messageExt) {
        //Get the key first
        String keys = messageExt.getKeys();
        //Insert into the database because the key is uniquely indexed
        OrderOperLog orderOperLog = new OrderOperLog();
        orderOperLog.setType(1l);
        orderOperLog.setOrderSn(keys);
        orderOperLog.setUserId("1003");
        int insert = logMapper.insert(orderOperLog);
        System.out.println(keys);
        System.out.println(new String(messageExt.getBody()));
    }
}

When consuming the second item, a duplicate unique index SQLIntegrityConstraintViolationException is thrown

The database only inserts one such record

Optimization: When the exception captured is SQLIntegrityConstraintViolationException, the message is signed directly and no more business processing is performed, because the same message has been consumed before, thus solving the problem of repeated consumption.

2.2 Using bloom filter

  • Use a deduplication solution, for example, store the unique identifier of the message, and then determine whether the unique identifier exists before each consumption. If it exists, it will not be consumed. If it does not exist, it will be consumed. And save this mark after consumption.
  • The idea is good, but the volume of messages is very large. It may reach tens of millions or even hundreds of millions in a production environment. So how should we choose a container? To save the identifiers of all messages and quickly determine whether they exist?

We can choose Bloom Filter (BloomFilter)

introduce:

Bloom Filter (English: Bloom Filter) was proposed by Bloom in 1970. It’s actually a long binary vector and a series of random mapping functions. Bloom filters can be used to retrieve whether an element is in a collection. Its advantage is that space efficiency and query time are far higher than those of ordinary algorithms. Its disadvantage is that it has a certain misrecognition rate and difficulty in deletion.

The principle of the Bloom filter is that when an element is added to the set, the element is mapped into K points in a bit array through K hash functions and set to 1. When retrieving, we only need to see if these points are all 1 to (approximately) know whether it is in the set: if any of these points has a 0, the checked element must not be there; if they are all 1, the checked element Most likely. This is the basic idea of Bloom filter.

2.2.1 Add hutool dependencies

<dependency>
  <groupId>cn.hutool</groupId>
  <artifactId>hutool-all</artifactId>
  <version>5.7.11</version>
</dependency>

2.2.2 Test Producer

public void testRepeatProducer() throws Exception {
//Create a default producer
DefaultMQProducer producer = new DefaultMQProducer("test-group");
//Set nameServer address
producer.setNamesrvAddr("localhost:9876");
// Start the instance
producer.start();
// We can use a custom key as a unique identifier
String keyId = UUID.randomUUID().toString();
System.out.println(keyId);
Message msg = new Message("TopicTest", "tagA", keyId, "I am a test message".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// Close the instance
producer.shutdown();
}

Sent two identical messages

55d397c9-814f-4931-b0fd-7e142c04759b
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C359, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C43F, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=2], queueOffset=0]

2.2.2 Test Consumer

/**
 * In the boot project, you can use @Bean to place a single interest object in the entire container
 */
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m array length

@Test
public void testRepeatConsumer() throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTestTopic", "*");
//Register a consumption listener MessageListenerConcurrently is a concurrent consumption
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        // Get the key of the message
        MessageExt messageExt = msgs.get(0);
        String keys = messageExt.getKeys();
        // Determine whether there is a Bloom filter
        if (bloomFilter.contains(keys)) {
            // Return directly without processing the business further
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        //This handles the business and then puts it in the filter
        // do sth...
        bloomFilter.add(keys);
        System.out.println("keys:" + keys);
        System.out.println(new String(messageExt.getBody()));
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
System.in.read();
}

Only one business was processed

keys:55d397c9-814f-4931-b0fd-7e142c04759b
Inventory-1

After the delay, the duplicate message is signed for

Solve the problem of repeated consumption