A literature to understand RabbitMQ middleware

1. MQ Overview

  • MQ is a message queue, a middleware for storing messages, and a container for storing small data
  • There are two ways of distributed system communication: direct remote call and indirect communication with the help of a third party
  • The sender is called the producer and the receiver is called the consumer

Advantages of MQ:

  • Application decoupling: improving system fault tolerance and maintainability
  • Asynchronous Speed Up: Improve user experience and system throughput
  • Shaving Peaks and Filling Valleys: Improving System Stability

Disadvantages of MQ:

  • Reduced system availability: The more external dependencies the system introduces, the worse the system stability will be. Once MQ goes down, it will affect the business.
  • Increased system complexity
  • Consistency problem: A finishes processing the business, and sends message data to B, C, and D through MQ. If systems B and C process successfully, but system D fails, how to ensure the consistency of message processing.

2. What are the conditions for using MQ?

1) The producer does not need to get feedback from the consumer. Direct call before introducing message queues.

2) Tolerate transient data inconsistencies.

3) It is indeed effective, but it is necessary to ensure that the benefits in this area exceed the costs of joining MQ and managing MQ.

3. Basic overview of RabbitMQ

  1. RabbitMQ is a message queuing product developed based on the AMQP protocol and using the Erlang language.
  2. RabbitMQ provides 6 working modes
  3. AMQP is a protocol, analogous to HTTP
  4. JMS is an API specification interface, analogous to JDBC

4. Install RabbitMQ in Linux system

1) Step 1: Use xftp to import the compressed package into the rabbitmq folder

2) The second step: install the language

# install
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

3) The third step: installation environment

# install
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

4) Step 4: Install rabbitmq

# install
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

5) Step 5: Start rabbitmq

systemctl start rabbitmq-server

6) Step 6: Install the interface management tool

# Open the management interface
rabbitmq-plugins enable rabbitmq_management

7) Step 7: Modify the configuration file

# modify the default configuration information
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# Such as modifying passwords, configurations, etc., for example: <<"guest">> in loopback_users, only keep guest

8) Step 8: Restart the service

systemctl restart rabbitmq-server

9) Step 9: Test login interface management

http://192.168.192.131:15672/
#Account is: guest
#Password is: guest

4. RabbitMQ Quick Start

1) Step 1: Create a producer to send messages to RabbitMQ

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("192.168.192.131");//The default value of ip is localhost
        factory.setPort(5672);//The default value of the port number is 5672
        factory.setVirtualHost("/itcast");//The default value of the virtual machine is/
        factory.setUsername("long");//The default user name is guest
        factory.setPassword("123456");//The default password is guest
        //3. Create a connection Connection
        Connection connection = factory. newConnection();
        //4. Create Channel
        Channel channel = connection. createChannel();
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive,
        boolean autoDelete, Map<String, Object> arguments)
        parameter:
            1.queue: queue name
            2.durable: Whether it is persistent or not, when mq restarts, it is still
            3. exclusive:
                * Whether it is exclusive, only one consumer can listen to this queue
                * Whether to delete the queue when the Connection is closed
            4.autoDelete: Whether to delete automatically, when there is no Consumer, automatically delete
            5.arguments: parameters
         */
        //If there is no queue named hello_world, the queue will be created, if there is, it will not be created
        //5. Create a queue Queue
        channel.queueDeclare("hello_world",true,false,false,null);
        //6. Specify the message to be sent
        String body = "hello rabbitmq~~~";
        /*
        basicPublish(String exchange, String routingKey, AMQP. BasicProperties props, byte[] body)
        parameter:
            1. exchange: exchange name. In simple mode the switch will use the default ""
            2.routingKey: routing name
            3. props: configuration information
            4.body: send message data
        */
        //7. Send message
        channel.basicPublish("","hello_world",null,body.getBytes());
        //8. Release resources
        channel. close();
        connection. close();

    }
}

2) The second step: Create a consumer subscription message

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("192.168.192.131");//The default value of ip is localhost
        factory.setPort(5672);//The default value of the port number is 5672
        factory.setVirtualHost("/itcast");//The default value of the virtual machine is/
        factory.setUsername("long");//The default user name is guest
        factory.setPassword("123456");//The default password is guest
        //3. Create a connection Connection
        Connection connection = factory. newConnection();
        //4. Create Channel
        Channel channel = connection. createChannel();
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive,
        boolean autoDelete, Map<String, Object> arguments)
        parameter:
            1.queue: queue name
            2.durable: Whether it is persistent or not, when mq restarts, it is still
            3. exclusive:
                * Whether it is exclusive, only one consumer can listen to this queue
                * Whether to delete the queue when the Connection is closed
            4.autoDelete: Whether to delete automatically, when there is no Consumer, automatically delete
            5.arguments: parameters
         */
        //If there is no queue named hello_world, the queue will be created, if there is, it will not be created
        //5. Create a queue Queue
        channel.queueDeclare("hello_world",true,false,false,null);
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        parameter:
            1.queue: queue name
            2.autoAck: Whether to confirm automatically
            3. callback: callback object
         */
        //receive message
        Consumer consumer = new DefaultConsumer(channel){
            /*
            Callback method, when the message is received, the method will be executed automatically
            parameter:
                1.consumerTag: identification
                2.envelope: Get some information, switch, routing key...
                3.properties: configuration information
                4.body: data
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume("hello_world", true, consumer);

    }
}

5, Work Queues work queue mode

1) If there are multiple consumers in a queue, the relationship between consumers for the same message is a competitive relationship.

2) Work Queues The use of work queues can improve the speed of task processing for heavy tasks or many tasks. For example, if there are multiple SMS service deployments, only one node needs to send them successfully.

6. Message response

In order to ensure that the message is not lost during the sending process, rabbitmq introduces a message response mechanism. The message response is: after the consumer receives the message and processes the message, it tells rabbitmq that it has been processed, and rabbitmq can delete the message.

7. Message re-entry

If the consumer loses connection for some reason and the message does not send an ACK, RabbitMQ will understand that the message was not fully processed and will re-queue it. If other consumers can handle it at this point, it will quickly redistribute it to another consumer. This way, even if a consumer dies occasionally, you can be sure that no messages will be lost.

8. Single confirmation release

This is a simple confirmation method, it is a synchronous confirmation release method, that is, after a message is published, only after it is confirmed and published, subsequent messages can continue to be published. The disadvantage is: the release speed is particularly slow, if because A message that is not confirmed to be published will block the publication of all subsequent messages, which provides a maximum throughput of no more than a few hundred published messages per second.

9. Batch confirmation release

Compared with a single message waiting for confirmation, publishing a batch of messages first and then confirming them together can greatly improve throughput. Of course, the disadvantage of this method is that when a failure causes a problem in publishing, we do not know which message has a problem. We The entire batch must be kept in memory to record important information and then republish the message. Of course, this solution is still synchronous, and it also blocks the release of messages.

10. Asynchronous confirmation release

Although the logic of asynchronous confirmation release is more complicated than the previous two, it is the most cost-effective, regardless of reliability or efficiency. It uses callback functions to achieve message reliability. This middleware is also implemented through function callbacks. Ensure delivery is successful.

11. Switch

The core idea of the RabbitMQ messaging model is that messages produced by producers are never sent directly to queues. In fact, usually the producer doesn’t even know which queues these messages are delivered to.

Instead, the producer can only send messages to the exchange, what the exchange does is very simple, on the one hand it receives messages from the producer, on the other hand it pushes them into a queue, the exchange has to know exactly what to do with the received messages, yes Whether these messages should be placed on a specific queue or discarded depends on the type of switch.

fanout broadcast mode:

Consumers 01 and 02 (similarly)

/**
 * Consumer 01: Test switch type fanout, publish and subscribe
 */
public class ReceiveLogs01 {
    // switch name
    public static final String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws Exception{
        // get channel
        Channel channel = RabbitMQUtils. getChannel();
        //Specify the switch name
        channel. exchangeDeclare(EXCHANGE_NAME,"fanout");
        //Get a temporary queue
        String queueName = channel. queueDeclare(). getQueue();
        //bind switch
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        System.out.println("C1 is ready to receive messages:");
        //success! Callback
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("The received message is: " + new String(message.getBody(),"UTF-8"));
        };
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("Failed to receive message" + consumerTag);
        };
        //receive message
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
    }
}

producer

/**
 * Producer: test fanout publishing and subscription
 */
public class EmitLog {
    // switch name
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        //create channel
        Channel channel = RabbitMQUtils. getChannel();
        //Send a message
        Scanner scanner = new Scanner(System.in);
        while (scanner. hasNext()){
            String message = scanner. next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
            System.out.println("Message sent by producer: " + message);
        }
    }

}

Direct Mode:

Consumers: 01 and 02 (similarly)

/**
 * Consumer 01: Test Direct mode
 */
public class ReceiveLogsDirect01 {
    //Define the switch name
    public static final String EXCHANGE_NAME = "direct_logs";
    //Define the queue name
    public static final String QUEUE_NAME = "console";

    public static void main(String[] args) throws Exception{
        // get channel
        Channel channel = RabbitMQUtils. getChannel();
        // Declare the switch
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //declaration queue
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //bind switch
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");

        System.out.println("ReceiveLogsDirect01 is ready to receive messages...");
        //success! Callback
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("The received message is: " + new String(message.getBody(),"UTF-8"));
        };
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("Failed to receive message" + consumerTag);
        };
        //receive message
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

Producer:

/**
 * Producer: Test Direct mode
 */
public class DirectLogs {
    //Define the switch name
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils. getChannel();
        Scanner scanner = new Scanner(System.in);
        while (scanner. hasNext()){
            String message = scanner. next();
            channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
            System.out.println("The message sent by production is: " + message);
        }
    }
}

Topic Mode:

Consumers: 01 and 02 (similarly)

/**
 * Consumer 01: Test topic mode
 */
public class ReceiveLogsTopic01 {
    //Define the switch name
    public static final String EXCHANGE_NAME = "topic_logs";
    //Define the queue name
    public static final String QUEUE_NAME = "Q1";

    public static void main(String[] args) throws Exception{
        // get channel
        Channel channel = RabbitMQUtils. getChannel();
        // Declare the switch
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //declaration queue
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //Bind switch and queue
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
        //receive message
        System.out.println("C1 is ready to receive messages...");
        //success! Callback
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("The received message is: " + new String(message.getBody(),"UTF-8"));
            System.out.println("Queue name: " + QUEUE_NAME + "binding key: " + message.getEnvelope().getRoutingKey());
        };
        //fail! Callback
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("Failed to receive message!" + consumerTag);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

Producer:

/**
 * Producer: test topic mode
 */
public class EmitLogTopic {
    //Define the switch name
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        // get channel
        Channel channel = RabbitMQUtils. getChannel();
        // store the data in the map collection
        Map<String,String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "received by queue Q1, Q2");
        bindingKeyMap.put("lazy.orange.elephant", "received by queue Q1, Q2");
        bindingKeyMap.put("quick.orange.fox","received by queue Q1");
        bindingKeyMap.put("lazy.pink.rabbit", "Although it satisfies two bindings, it is only received once by queue Q2");
        bindingKeyMap.put("lazy.brown.fox","received by queue Q2");
        bindingKeyMap.put("quick.brown.fox", "any binding that does not match will not be received by any queue and will be discarded");
        bindingKeyMap.put("quick.orange.male.rabbit", "any binding that does not match four words will be discarded");
        bindingKeyMap.put("lazy.orange.male.rabbit", "is four words but matches Q2");
        //Send a message
        Set<Map.Entry<String, String>> entries = bindingKeyMap.entrySet();
        for (Map.Entry<String, String> entry : entries) {
            String key = entry. getKey();
            String message = entry. getValue();
            channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8"));
            System.out.println("The producer sent a message: " + message);
        }

    }
}