nodejs operates rabbitMQ rascal library (encapsulation for amqplib)

Rascal is a rich pub/sub wrapper around amqplib. One of the best things about amqplib is that it makes no assumptions about how you use it. Another is that it doesn’t attempt to abstract AMQP Concepts. Therefore, the library provides a lot of control and flexibility, but it is your responsibility to adopt appropriate patterns and configurations. What you need to pay attention to is:

  • Messages are not persistent by default and will be lost if your broker is restarted
  • Messages that cause the app to crash will be retried infinitely
  • Without prefetching, a sudden flood of messages can break your event loop
  • Broken connections and interrupted channels are not automatically restored
  • Any connection or channel errors are emitted as “error” events. Unless you handle them or use domains, they will cause your application to crash
  • If a message is published using an acknowledgment channel and the broker fails to acknowledge it, the execution flow may block indefinitely

Rascal attempts to solve these problems by adding the following to amqplib to make them easier to deal with or bring to your attention

  • Configuration driven virtual hosts, switches, queues, bindings, producers and consumers
  • Cluster connection support
  • Transparent content parsing
  • Transparent encryption/decryption
  • Automatic reconnect and resubscribe
  • Advanced error handling including delays, limited retries
  • Remote procedure call support
  • redelivery protection
  • channel pool
  • flow control
  • Post timeout
  • safe defaults
  • Promise and callback support
  • Time division duplex support

Note:

1. When a connection or channel encounters a problem, amqplib will throw an error event. Rascal will listen for these events and will attempt to recover automatically (reconnect, etc.) if you are using the default configuration, but these events may indicate a bug in your code, so it is important to bring it to your attention. Rascal does this by re-emitting error events, which means if you don’t handle them, they will bubble up to an uncaught error handler and crash your application. There are four places you should do this:

1. Immediately after obtaining the broker instance broker.on(‘error’, console.error);

2. After subscribing to the message, await broker.subscribe(‘s1’).on(‘error’, console.error)

3. After publishing the message, await broker.publish(‘p1’, ‘some text’).on(‘error’, console.error)

4. After forwarding the message, await broker.forward(‘p1’, message).on(‘error’, console.error)

2. Avoid potential message loss

In three cases, Rascal will acknowledge the message without requeuing it, resulting in potential data loss.

1. When the message content cannot be parsed and the subscriber does not have an “invalid_content” listener

2. When the subscriber’s (optional) redelivery limit has been exceeded and the subscriber has neither “redelivery_error” nor “redelivery_exceeded” listeners

3. When trying to restore by republishing and forwarding, the restore operation failed.

The reason Rascal rejects messages is because the alternative is to not acknowledge the message indefinitely, or to roll back and retry the message in an infinitely tight loop. This can DDOS your application and cause problems with your infrastructure. If you have configured your dead letter queue correctly or listened to the “invalid_content” and “redelivery_exceeded” subscriber events, your messages should be safe.

config.js

const { MQ_HOST, HOST, MQ_PORT } = process.env;
const mqHost = MQ_HOST || HOST || "127.0.0.1";
const mqPort = MQ_PORT || 5672;
const mqUsername = "root";
const mqPassword = "paasword";

const exchangeName = 'exchange_direct_saas'; //switch
const queueName = 'queue_direct_saas';
const routingKey = 'saasIsolution';//routing key

const config = {
  "vhosts": {
    "/": {
      "publicationChannelPools": { //Use pool channels to publish messages. Create two pools for each virtual host, one for confirmation channels and the other for regular channels. But two pools will not be created before the first use (default autostart: false) and idle channels will be automatically evicted from the pool
        "regularPool": {
          "max": 10,
          "min": 5,
          "evictionRunIntervalMillis": 10000,
          "idleTimeoutMillis": 60000,
          "autostart": true
        },
        "confirmPool": {
          "max": 10,
          "min": 5,
          "evictionRunIntervalMillis": 10000,
          "idleTimeoutMillis": 60000,
          "autostart": true
        }
      },
      "connectionStrategy": "random",
      "connection": {
        "slashes": true,
        "protocol": "amqp",
        "hostname": mqHost,
        "user": mqUsername,
        "password": mqPassword,
        "port": mqPort,
        "vhost": "/",
        "options": {
          "heartbeat": 10, //Heartbeat time. If your task execution time is relatively long, increase this setting. rabbit-server's heartbeat defaults to 60
          "connection_timeout": 10000,
          "channelMax": 100
        },
        "socketOptions": {
          "timeout": 10000
        },
        "management": {
          "options": {
            "timeout": 1000
          }
        },
        "retry": {
          "min": 1000,
          "max": 60000,
          "factor": 2,
          "strategy": "exponential" //exponential: The exponential configuration will cause rascal to retry connections at exponentially increasing intervals (up to one minute). The intervals are randomly adjusted so that if you have multiple services, they don't all reconnect at the same time. linear: A linear configuration will cause rascal to retry connections at linearly increasing intervals (between one and five seconds)
        }
      },
      "exchanges": {//Define exchange
        [exchangeName]: {
          "type": "direct",
          "options": {
            "durable": true
          }
        }
      },
      "queues": { //Define queue
        [queueName]: {
          "options": {
            "autoDelete": false,
            "durable": true
          }
        }
      },
      "bindings": {//Define bindings
        "b1": {
          "source": exchangeName,
          "destination": queueName,
          "destinationType": "queue",
          "bindingKey": routingKey
        }
      }
    }
  },
  "subscriptions": {//Subscription messages
    "s1": {
      "queue": queueName,
      "vhost": "/",
      "prefetch": 1,
      "retry": {
        "delay": 1000
      }
    }
  },
  "publications": {//Publish news
    "p1": {
      "vhost": "/",
      "exchange": exchangeName,
      "routingKey": routingKey,
      "confirm": true,
      "options": {
        "persistent": true
      }
    }
  }
}
module.exports = config;

Producer side

const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
const definitions = require('./config.js');
const { getInitParams } = require('../lib')
const params = getInitParams();

async function product(msg) {
    let broker;
    try {
        broker = await Broker.create(withDefaultConfig(definitions)); //withDefaultConfig comes with sensible defaults suitable for production and test environments (optimized for reliability rather than speed)
        broker.on('error', console.error);

        // Publish a message
        const publication = await broker.publish('p1', msg);
        console.log("Producer message sent");
        publication.on('error', console.error);
    } catch (err) {
        console.error(err);
    }finally{
        await broker?.shutdown();
    }
}

product(JSON.stringify(params));

Consumer side

const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
const definitions = require('./config.js');
const { getDoIsolation, getDoClear } = require("../lib");

async function consumer(i) {
    try {
        const broker = await Broker.create(withDefaultConfig(definitions)); //withDefaultConfig comes with sensible defaults suitable for production and test environments (optimized for reliability rather than speed)
        broker.on('error', error => { console.error(error, "broker Error"); });
    
        // Consume a message
        const subscription = await broker.subscribe('s1'); //If the subscription does not exist, an exception will be thrown
        subscription
            .on('message', async(message, content, ackOrNack) => {
                    const params = JSON.parse(content);
                    const doIsolation = getDoIsolation(params);
                    console.log(`Consumer${i}`, params, doIsolation);
                    await doIsolation();
                    ackOrNack();
            })
            .on('error', error => { console.error("subscribe Error",error); })
            .on('invalid_content', (err, message, ackOrNack) => { //If the content cannot be parsed (for example, the content type of the message is "application/json", but the content is not json), it will issue "invalid_content "event
                    console.error('Invalid content', err);
                    ackOrNack(err);//Default nack policy
            })
            .on('redeliveries_exceeded', (err, message, ackOrNack) => { //If the number of redeliveries exceeds the subscriber limit, the subscriber will emit the "redelivery_exceeded" event and can be handled by your application
                    console.error('Redeliveries exceeded', err);
                    ackOrNack(err, [{ strategy: 'republish', defer: 1000, attempts: 10 }, { strategy: 'nack' }]); //Republish the message back to the queue from which it came. When specifying the number of attempts, you should always link a fallback strategy, otherwise your message will not be acknowledged or rejected if the number of attempts is exceeded
            });
      } catch (err) {
        console.error("Other Error",err);
      }
    console.log(`Consumer ${i} started successfully`)
}



for(i=0; i<=2; i + + ){
    consumer(i)
}

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 16,970 people are learning the system