Table of Contents
1. Reasons for repeated consumption
2. Solve
2.1 Database insertion method
2.2 Using Bloom filters
2.2.1 Add hutool dependencies
2.2.2 Test producer
2.2.2 Test consumers
1. Reasons for repeated consumption
- In BROADCASTING (broadcasting) mode, all registered consumers will consume, and these consumers are usually microservices deployed in the cluster, so that multiple machines will consume repeatedly , of course, this is selected according to needs.
- In CLUSTERING (load balancing) mode, if a topic is consumed by multiple consumerGroups, it will also be consumed repeatedly.
- Even in CLUSTERING mode, under the same consumerGroup, a queue will only be assigned to one consumer, and it seems that there will be no repeated consumption. However, there is a special case: after a consumer comes online, all consumers in the same group need to reload balance and rebalance reBalance (and conversely, the same applies after a consumer goes offline). The new consumer corresponding to a queue needs to obtain the offset of the previous consumption (offset, that is, the point at which the message is consumed). At this time, the previous consumer may have already consumed a message. message, but the offset is not submitted to the broker, then the new consumer may consume it again. Although the orderly mode is a mode where the previous consumer unlocks first, and the next consumer locks and then consumes, it is stricter than concurrently, but the thread that locks the lock and the thread that submits the offset are not the same, so extreme situations will still occur. of repeated consumption.
- When sending a batch message, it will be processed as one message. So if one business in the batch message is successfully processed and the others fail, it will still be processed again. Consume once.
simply put:
- After the Consumer consumes the message, it is not synchronized to the Broker in real time. Instead, the offset is saved in the local map and persisted through the scheduled task. This causes the message to be consumed, but at this time the consumer is down and the offset is not submitted. The part of the message that has not submitted the offset will be consumed again next time.
- Even if the offset is submitted to the Broker, the Broker crashes before it can be persisted. When restarted, the Broker will read the offset information saved in consumerOffset.json , which will cause this part of the message without persistent offset to be consumed again
So what should we do if we are in CLUSTERING (load balancing) mode and in the same consumer group and do not want a message to be consumed repeatedly? We can think of the deduplication operation and find the unique identifier of the message, which can be msgId or your customized unique key, so that we can deduplicate it.
2. Solution
We need to implement Impotent for our consumers, that is, the processing result of the same message will not change no matter how many times it is executed.
Idempotence: The impact of multiple operations is the same as the impact of the first operation
For example: Determine the idempotence of crud
a. New addition: Ordinary new addition is non-idempotent, while new addition with unique index is set as idempotent operation.
b. Modification: update goods set stock = 10 where id = 1 idempotent
update goods set stock = stock – 1 where id = 1 non-idempotent
c. Query: Idempotent
d. Delete: idempotent
So how to achieve idempotence for business? This still needs to be combined with specific businesses. You can guarantee this by writing to Redis, because Redis’s keys and values naturally support idempotence. Of course, you can also use the Database Insertion Method to ensure that duplicate data will not be inserted multiple times based on the unique key of the database.
2.1 Database insertion method
The sender needs to bring a unique tag to the message (according to the business identifier)
Simulation business database order operation log table structure (duplication table)
Add a unique index to the order number (the order number is stored as a key)
Simulating business, the producer sent duplicate messages
@Test public void repeatTest() throws Exception { String key = UUID.randomUUID().toString(); Message<String> msg = MessageBuilder.withPayload("Deduction Inventory -1").setHeader(RocketMQHeaders.KEYS, key).build(); rocketMQTemplate.syncSend("repeatTopic", msg); rocketMQTemplate.syncSend("repeatTopic", msg); }
consumer
@Component @RocketMQMessageListener(topic = "repeatTopic",consumerGroup = "repeat-consumer-group") public class RepeatListener implements RocketMQListener<MessageExt> { @Autowired private LogMapper logMapper; @Override public void onMessage(MessageExt messageExt) { //Get the key first String keys = messageExt.getKeys(); //Insert into the database because the key is uniquely indexed OrderOperLog orderOperLog = new OrderOperLog(); orderOperLog.setType(1l); orderOperLog.setOrderSn(keys); orderOperLog.setUserId("1003"); int insert = logMapper.insert(orderOperLog); System.out.println(keys); System.out.println(new String(messageExt.getBody())); } }
When consuming the second item, a duplicate unique index SQLIntegrityConstraintViolationException
is thrown
The database only inserts one such record
Optimization: When the exception captured is SQLIntegrityConstraintViolationException
, the message is signed directly and no more business processing is performed, because the same message has been consumed before, thus solving the problem of repeated consumption.
2.2 Using bloom filter
- Use a deduplication solution, for example, store the unique identifier of the message, and then determine whether the unique identifier exists before each consumption. If it exists, it will not be consumed. If it does not exist, it will be consumed. And save this mark after consumption.
- The idea is good, but the volume of messages is very large. It may reach tens of millions or even hundreds of millions in a production environment. So how should we choose a container? To save the identifiers of all messages and quickly determine whether they exist?
We can choose Bloom Filter (BloomFilter)
introduce:
Bloom Filter (English: Bloom Filter) was proposed by Bloom in 1970. It’s actually a long binary vector and a series of random mapping functions. Bloom filters can be used to retrieve whether an element is in a collection. Its advantage is that space efficiency and query time are far higher than those of ordinary algorithms. Its disadvantage is that it has a certain misrecognition rate and difficulty in deletion.
The principle of the Bloom filter is that when an element is added to the set, the element is mapped into K points in a bit array through K hash functions and set to 1. When retrieving, we only need to see if these points are all 1 to (approximately) know whether it is in the set: if any of these points has a 0, the checked element must not be there; if they are all 1, the checked element Most likely. This is the basic idea of Bloom filter.
2.2.1 Add hutool dependencies
<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.11</version> </dependency>
2.2.2 Test Producer
public void testRepeatProducer() throws Exception { //Create a default producer DefaultMQProducer producer = new DefaultMQProducer("test-group"); //Set nameServer address producer.setNamesrvAddr("localhost:9876"); // Start the instance producer.start(); // We can use a custom key as a unique identifier String keyId = UUID.randomUUID().toString(); System.out.println(keyId); Message msg = new Message("TopicTest", "tagA", keyId, "I am a test message".getBytes()); SendResult send = producer.send(msg); System.out.println(send); // Close the instance producer.shutdown(); }
Sent two identical messages
55d397c9-814f-4931-b0fd-7e142c04759b SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C359, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=1], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C43F, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=2], queueOffset=0]
2.2.2 Test Consumer
/** * In the boot project, you can use @Bean to place a single interest object in the entire container */ public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m array length @Test public void testRepeatConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group"); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR); consumer.subscribe("repeatTestTopic", "*"); //Register a consumption listener MessageListenerConcurrently is a concurrent consumption consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // Get the key of the message MessageExt messageExt = msgs.get(0); String keys = messageExt.getKeys(); // Determine whether there is a Bloom filter if (bloomFilter.contains(keys)) { // Return directly without processing the business further return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //This handles the business and then puts it in the filter // do sth... bloomFilter.add(keys); System.out.println("keys:" + keys); System.out.println(new String(messageExt.getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
Only one business was processed
keys:55d397c9-814f-4931-b0fd-7e142c04759b Inventory-1
After the delay, the duplicate message is signed for
Solve the problem of repeated consumption