nodejs operates rabbitMQ amqplib library message persistence

config.js

const { MQ_HOST, HOST, MQ_PORT } = process.env;
const mqHost = MQ_HOST || HOST || "127.0.0.1";
const mqPort = MQ_PORT || 5672;
const mqUsername = "root";
const mqPassword = "password";
const mqProtocol = "amqp";

const exchangeName = 'exchange_direct_saas'; //switch
const queueName = 'queue_direct_saas';
const routingKey = 'saasIsolution';//routing key

const config = { mqHost, mqPort, mqUsername, mqPassword, mqProtocol, exchangeName, queueName, routingKey };
module.exports = config;

Producer side:

const amqp = require('amqplib');
const { getInitParams } = require('../../lib')
const params = getInitParams();
const { mqHost, mqPort, mqUsername, mqPassword, mqProtocol, exchangeName, queueName, routingKey } = require("./config");

async function product(msg) {
    const connection = await amqp.connect({protocol: mqProtocol, hostname: mqHost, port: mqPort, username: mqUsername, password: mqPassword});
    connection.on('error', console.error)
    const channel = await connection.createConfirmChannel();
    channel.on('error', console.error)
    //Associate the switch, set the type of the switch, and persist the message {durable: true} The parameter is to persist the message
    await channel.assertExchange(exchangeName, 'direct', { durable: true });
    //Set up fair scheduling. Configure the value of the prefetch count item to 1, which will instruct RabbitMQ not to send more than one message to each consumer at the same time. In other words, no message is sent to the consumer until the message is processed and acknowledged. Instead, it will send the message to the next idle consumer or worker process.
    await channel.prefetch(1, false); //true sets the limit on the channel, false sets the limit on the consumer end, the default is false
    await channel.publish(exchangeName, routingKey, Buffer.from(msg));
    // console.log("Producer message sent");
    channel.waitForConfirms().then(async results => { //Wait for confirmation to complete
        // console.log(results);
        const errors = results.filter(Boolean); //Remove false values
        if (errors.length) console.error('Errors', errors);
        else console.log(new Date().toISOString(), `Broker confirmed ${results.length} messages`);
    }).catch(e => {
        console.error(e, "error");
    }).finally( async()=>{
        //Close channel
         channel.close();
        //Close the connection
         connection.close();
    } )
}

product(JSON.stringify(params));

Consumer side

const amqp = require('amqplib');
const { getDoIsolation } = require("../../lib");
const { mqHost, mqPort, mqUsername, mqPassword, mqProtocol, exchangeName, queueName, routingKey } = require("./config");

async function consumer() {
    const connection = await amqp.connect({protocol: mqProtocol, hostname: mqHost, port: mqPort, username: mqUsername, password: mqPassword});
    connection.on('error', console.error)
    const channel = await connection.createConfirmChannel();
    channel.on('error', console.error)
    //Associate the switch, set the switch type, and persist the message {durable: true} The parameter is to persist the message
    await channel.assertExchange(exchangeName, 'direct', { durable: true });
    //Associate the message queue autoDelete:true to set the queue to be automatically deleted when it is empty. Queue persistence
    await channel.assertQueue(queueName,{autoDelete:false, durable: true});
    //Binding relationship (queue, switch, routing key)
    await channel.bindQueue(queueName, exchangeName, routingKey);
    //Set fair scheduling. Configure the value of the prefetch count item to 1, which will instruct RabbitMQ not to send more than one message to each consumer at the same time. In other words, no message is sent to the consumer until the message is processed and acknowledged. Instead, it will send the message to the next idle consumer or worker process.
    await channel.prefetch(1, false);//true sets the limit on the channel, false sets the limit on the consumer end, the default is false
    //Consume queue messages
    await channel.consume(queueName, async (msg) => {
        try{
            const params = JSON.parse(msg.content.toString());
            const doIsolation = getDoIsolation(params);
            console.log(params, doIsolation);
            await doIsolation();
            channel.ack(msg);
        }catch(e){
            console.error(e);
        }
    }, { noAck: false });//Manual ack response
    console.log("Consumer started successfully")
}
for(i=0; i<=2; i + + ){
    consumer();
}

Message persistence Avoid loss strong>

1. Avoid message loss maintained by broker message middleware

The persistence of the switch is actually equivalent to saving the attributes of the switch within the server. When the MQ server encounters an accident or is shut down, you do not need to manually or execute code to establish the switch again when restarting RabbitMQ. The switch will be established automatically, which is equivalent to always existing. When declaring the exchanger, set the durable attribute to true.

Queue persistence is also when declaring the queue and setting the durable parameter to true. If the queue does not set persistence, the queue will be deleted after the RabbitMQ service is restarted. Since the queue no longer exists, the messages in the queue will also be lost.

Message persistenceTo ensure that the message is not lost, the message needs to be set to persistence. Information persistence is to store information on disk. When publishing a message, the producer can set the deliveryMode attribute of options to 2. Mark message as persistent message

All messages can be set to be persistent, but this will seriously affect the performance of RabbitMQ. Writing to disk is more than a little slower than writing to memory. For messages whose reliability is not that high, persistence processing may not be used to improve the overall throughput. When choosing whether to persist messages, there is a trade-off between reliability and throughput.

General systems do not need to persist messages. However, persistence of switches and queues still needs to be supported.

2.Avoid producer producer -> MQMessage lost during the process

The message sent by the producer is not sent to RabbitMq due to network and other reasons

solution:

2-1. Enable RabbitMq transaction mechanism

The producer opens RabbitMQ before sending data Transaction channel.txSelect, and then sends the message if the message is not successfully processed by RabbitMQ If received, the producer will receive an exception error. At this time, the transaction channel.txRollback can be rolled back and the message is retried. If the message is received, the transaction channel.txCommit can be submitted, similar to our database transaction mechanism.

2-2, turn on confirm mode< strong>(Recommended)

After setting the confirm mode on the producer side, each message you write will be assigned a unique ID, and then if it is written to RabbitMQ, RabbitMQ will give you a reply Send an ack message to tell you that the message has been received. If RabbitMQ fails to process the message, it will call back one of your nack interfaces to tell you that the message reception failed and you can try again. Moreover, you can combine this mechanism to maintain the status of each message ID in your own business. If a callback for this message is not received for a certain period of time, the business can proactively resend it.

Advantages and disadvantages of transaction mechanism and confirm mechanism:

The transaction mechanism is synchronous. After submitting a transaction, it will block, throughput will decrease, and performance will be consumed.

The confirm mechanism is asynchronous, the process will not block, the throughput is high, and the performance is good.

3.AvoidconsumerUnprocessed messages are lost

Reason: When the consumer automatically ack is configured (no_ack=true), the message is considered to have been successfully transmitted immediately after it is sent, and the business code exception or other fault messages are not processed. It will also automatically ack. RabbitMq messages will be discarded after ack, which results in the loss of messages under abnormal circumstances.

solution:

Turn off RabbitMq automatic ack (no_ack=false). If the business code successfully consumes the message, manually call Mq ack and let Mq discard the message; if the business code is abnormal, directly nack and let Mq Resend the message for processing. Of course, under relatively high requirements, abnormal data can also be entered into the dead letter queue to ensure data integrity.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 16,968 people are learning the system