Message queue + how to ensure that messages are not lost + how to ensure the orderliness of messages + how to deal with message accumulation

1. Basic concepts of message queue

The message queue model is divided into: queue model and publish/subscribe model.

Introduction to the queue model: Producers send messages to queues. A queue can store messages sent by multiple producers, and a queue can also have multiple consumers. There is a competitive relationship between consumers, that is, each message can only be consumed by one consumer.

2. Publish/subscribe model

Introduction: This model sends messages to a Topic, and all subscribers to this Topic can consume this message.

To put it simply: in a conference room, if one person speaks, all participants can hear the speech.

Queue model: Each message can only be consumed by one consumer.

Publish/subscribe model: allow a message to be consumed by multiple consumers

The queue model can also store messages in multiple queues so that one message can be consumed by multiple consumers, but there is data redundancy.

3. How to ensure that messages are not lost

Schematic diagram of message production and consumption. Next, we need to analyze it from the producer, broker, and consumer respectively:

On the producer side:

When the producer sends a message to the Broker, it needs to process the Broker’s response. Whether the message is sent synchronously or asynchronously, try-catch is required for both synchronous and asynchronous callbacks and the response is properly processed. If the Broker returns an error message such as writing failure, it needs to be repeated. Try sending. When sending fails multiple times, alarms, log records, etc. need to be made.

This ensures that messages will not be lost during the message production phase.

Broker stores messages:

1: In the message storage stage, the producer needs to respond after the message is flushed. Otherwise, the machine will be down after responding to the producer in advance, which will lead to data loss.

2::Cluster deployment:

Using the multi-copy mechanism, messages not only need to be written to the current Broker, but also need to be written to the replica machine. Configure it to write to at least two machines before responding to the producer.

consumer:

After the consumer actually executes the business logic, it sends it to the Broker for successful consumption. Prevent machine downtime during consumption

4. How to deal with duplicate messages

Cause

1: The producer has sent a message to the broker, and the broker may have written it. The producer is waiting for the broker’s response. During the waiting process, there may be network fluctuations and other reasons, resulting in the producer not receiving the response after the broker writes, and then the producer It was resent once and the message was repeated.

2: Retry mechanism: RocketMQ provides a message retry mechanism. If an exception occurs during the message consumption process, the consumer may re-pull the same message and try again. If the consumer’s processing logic does not handle idempotence (that is, multiple processing will not produce inconsistent results), the message may be consumed repeatedly.

3: Cluster mode repeated consumption: In RocketMQ’s cluster mode, if multiple consumer groups subscribe to the same topic, and each consumer group consumes messages independently, then the same message may Repeatedly consumed by different consumer groups.

Summary of reasons: Repeated sending by producers, retry mechanism, consumer end downtime, repeated consumption of messages in cluster mode

After the consumer gets the message, it executes the business logic, and the transaction is submitted. At this time, the Consumer offset needs to be updated. At this time, the consumer may hang up, and another consumer gets the repeatedly sent message and executes the business logic again.

Message duplication is inevitable, and the problem of message duplication can be solved at the business level

Solution

The key point is idempotence

Idempotent: Calling the same interface multiple times with the same parameters produces the same result as calling it once.

Specific measures: The following is the sample code of rabbitMq:

The first type: realize the idempotence of messages on the producer side

The sender should ensure the uniqueness of the message and can use a unique message identifier at the business level to avoid sending the same message repeatedly. Generate a unique ID for the message when sending it.

public class MqConst {

    /**
     * Product loading and unloading
     */
    public static final String EXCHANGE_GOODS_DIRECT = "goods.direct";
    public static final String ROUTING_GOODS_UPPER = "goods.upper";
    public static final String ROUTING_GOODS_LOWER = "goods.lower";
    public static final String MESSAGE_ID_PREFIX = "MESSAGE_ID_";
}
//First create an interface
public interface RabbitService {

    public boolean sendMessage(String exchange, String routeKey, Object msg);

    // call this method
    boolean sendMessage(String exchange, String routeKey, Object msg, String msgId);
}
@Service
public class RabbitServiceImpl implements RabbitService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public boolean sendMessage(String exchange, String routeKey, Object msg) {
        rabbitTemplate.convertAndSend(exchange, routeKey, msg);
        return true;
    }

    /** Message generation method
     * exchange: switch
     * routeKey: route
     * msg: message sent
     * msgId: message unique id
     * @param exchange
     * @param routeKey
     * @param msg
     * @param msgId
     */
    @Override
    public boolean sendMessage(String exchange, String routeKey, Object msg, String msgId) {
        rabbitTemplate.convertAndSend(exchange, routeKey, msg,new CorrelationData(msgId));
        return true;
    }
 // Product loading and unloading
    @Override
    public void publish(Long skuId, Integer status) {
        //The producer sends a message to generate a unique ID
        String msgId = MqConst.MESSAGE_ID_PREFIX + System.currentTimeMillis() + UUID.randomUUID().toString();
        //Save into the collection, consumers can judge
        Set<String> processedMessageIds = new HashSet<>();
        processedMessageIds.add(msgId);
        //Deposit
        if (status == 1) { //Online
            SkuInfo skuInfo = baseMapper.selectById(skuId);
            skuInfo.setPublishStatus(status);
            baseMapper.updateById(skuInfo);
            //send
            rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,
                                    MqConst.ROUTING_GOODS_UPPER,
                                    skuId,
                                    msgId);
        } else { //remove from shelves
            SkuInfo skuInfo = baseMapper.selectById(skuId);
            skuInfo.setPublishStatus(status);
            baseMapper.updateById(skuInfo);
            //send
            rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,
                    MqConst.ROUTING_GOODS_LOWER,
                    skuId,
                    msgId);
        }
    }

The consumer gets MqConst.EXCHANGE_GOODS_DIRECT, gets the message id, and then compares it.

Other ways:

For example, this SQL update table1 set count= 100 where id = 1 and count= 50; no matter how many times it is executed, the count will always be 100, which is called idempotence. Therefore, the business processing logic needs to be modified so that the final result will not be affected even if messages are repeated. You can use the same SQL as above to make a precondition judgment, that is, count = 50, and modify it directly.

A more general approach is to use version control, which compares the version number in the message with the version number in the database. Or through database constraints such as unique keys, such as insert into update on duplicate key… .

It depends on the business, and the specific analysis is based on the business.

5. How to deal with duplicate messages

Orderliness: global order and partial order.

Global ordering: If you want to ensure the global ordering of messages, first only one producer can send messages to a Topic, and there can only be one queue (partition) inside a Topic. Consumers must also be single-threaded to consume this queue. Such messages are globally ordered.

Partial ordering: Most of the ordering requirements are partial ordering. We can divide the Topic internally into the number of queues we need, send messages to fixed queues through specific strategies, and then each queue corresponds to a single order. Consumer of thread processing. This not only fulfills some of the ordering requirements, but also improves message processing efficiency through the concurrency of the number of queues.

As shown in the figure: just send similar messages to the same queue.

6. How to deal with message accumulation

The reason for the accumulation of messages: because the production speed of producers does not match the consumption speed of consumers. It may be caused by repeated retries after failed message consumption, or it may be that the consumer’s consumption ability is weak, and messages gradually become backlogged.

Therefore, first locate the cause of slow consumption, and if there is a bug in the code, deal with the bug.

If our own consumption ability is weak, we can optimize the consumption logic. For example, we used to consume and process messages one by one. This time we process them in batches, such as database insertion. The efficiency of one-by-one insertion and batch insertion is different.

If we have optimized the logic but it is still slow, we need to consider horizontal expansion and increase the number of Topic queues and consumers.

Note that the number of queues must be increased, otherwise the newly added consumers will have nothing to consume. In a Topic, a queue will only be assigned to one consumer.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java Skill TreeHomepageOverview 139107 people are learning the system