Distributed message queue:Rabbitmq(2)

Table of Contents

1: Switch

1:Direct switch

1.1 Producer-side code:

1.2: Consumer code:

2:Topic theme switch

2.1: Producer code:

2.2: Consumer code:

2: Core features

2.1: Message expiration mechanism

2.1.1: Specify expiration time for all messages in the queue

2.1.2: Specify expiration time for a message

2.2: Dead letter queue


one:switch

1:Direct switch

Binding: Associate the switch with the queue, and you can specify what kind of messages the switch sends to the queue.

rountingkey: Routing key, controls which queue the message is sent to.

Features: Specify the queue to be forwarded to based on the routing key

Scenario:Specific messages are assigned to specific queues

1.1 producer code:

We specify who completes the task by entering messages and routing through the console.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class DirectProducer {


        private static final String EXCHANGE_NAME = "2";

        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //Create the name of the switch
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                Scanner scanner=new Scanner(System.in);
                while(scanner.hasNext()){
                    String userInput=scanner.nextLine();
                    String[] s = userInput.split(" ");
                    if(s.length<1){
                        continue;
                    }
                    //Specify routing key
                    String message=s[0];
                    String routingKey=s[1];
                    //make an announcement
                    /*
                      The first parameter: which switch to publish to
                      Second parameter: routing key
                     */
                    channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
                    System.out.println("[x] Sent" + message + "with rounting" + routingKey + " ");
                }


            }
        }
        //..

    }

1.2:Consumer-side code:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class DirectProducer {


        private static final String EXCHANGE_NAME = "2";

        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //Create the name of the switch
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                Scanner scanner=new Scanner(System.in);
                while(scanner.hasNext()){
                    String userInput=scanner.nextLine();
                    String[] s = userInput.split(" ");
                    if(s.length<1){
                        continue;
                    }
                    //Specify routing key
                    String message=s[0];
                    String routingKey=s[1];
                    //make an announcement
                    /*
                      The first parameter: which switch to publish to
                      Second parameter: routing key
                     */
                    channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
                    System.out.println("[x] Sent" + message + "with rounting" + routingKey + " ");
                }


            }
        }
        //..

    }

operation result:

2:Topic theme switch

Features: Messages are forwarded to specified queues based on an ambiguous routing key.

Scenario: A specific type of message is only handed over to a specific type of system (processed by a program).

Binding relationship:Fuzzy matching message queue *: Match one word #: Match 0 or more words

2.1:Producer code:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class TopicProducer {
    private static final String EXCHANGE_NAME = "3";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            Scanner scanner=new Scanner(System.in);
            while(scanner.hasNext()){
                String userInput=scanner.nextLine();
                String[] s = userInput.split(" ");
                if(s.length<1){
                    continue;
                }
                //Specify routing key
                String message=s[0];
                String routingKey=s[1];
                //make an announcement
                    /*
                      The first parameter: which switch to publish to
                      Second parameter: routing key
                     */
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
                System.out.println("[x] Sent" + message + "with rounting" + routingKey + " ");
            }

        }
    }
}

2.2:Consumer code:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;



public class TopicConsumer {
    private static final String EXCHANGE_NAME = "3";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //Create message queue
        String queueName="fronted_queue";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"#.Frontend.#");
        String queueName2="backed-_queue";
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueBind(queueName2,EXCHANGE_NAME,"#.Backend.#");
        String queueName3="product_queue";
        channel.queueDeclare(queueName3,true,false,false,null);
        channel.queueBind(queueName3,EXCHANGE_NAME,"#.product.#");
        System.out.println(" [*] Waiting for messages. To exit press CTRL + C");

        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [front end] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [Backend] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        DeliverCallback deliverCallback3 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [Product] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
        channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        channel.basicConsume(queueName3, true, deliverCallback3, consumerTag -> { });
    }

}

operation result:

2: Core Features

2.1: Message expiration mechanism

Features: Specify a validity period for each message. If it is not consumed within a period of time, it will expire.

2.1.1: Specify expiration time for all messages in the queue

In the consumer, specify an expiration time for all messages in the queue. If no consumer has retrieved the message within the expiration time, the message will expire. If the message has been received but not confirmed, it will not expire.

public class TTLConsumer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        //Create connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //Create channel and provide communication
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //Specify the expiration time of the message queue
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl",5000);
        //args: Specify parameters
        channel.queueDeclare(QUEUE_NAME, false, false,false, args);
        System.out.println(" [*] Waiting for messages. To exit press CTRL + C");
        //How to handle the message
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

2.1.2: Specify the expiration time for a certain message

//Set the expiration time on the sender's side
public class TTLProducer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        //Create connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             //The channel is equivalent to the client (jdbcClient, redisClient), which provides communication with the dequeue server. The program sends messages through the channel.
             Channel channel = connection.createChannel()) {
            //Create a message queue, the second parameter (durable): whether to enable persistence, the third parameter exclusiove: whether to allow the current creation of the message queue
            //Connection operation message queue The fourth parameter: No one is using the queue, does it need to be deleted?
            String message = "Hello World!";
            //Specify expiration time for the message
            AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                    .expiration("1000")
                            .build();

            channel.basicPublish("", QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

2.2:Dead Letter Queue

In order to ensure the reliability of messages, for example, each message is successfully consumed, a fault-tolerant mechanism needs to be provided, that is, how to handle failed messages, which is equivalent to a dead letter.

Dead letter: Expired messages, rejected messages, messages that failed to be processed, and the message queue is full are collectively called dead letters.

Dead letter queue: Queue for processing dead letters.

Dead letter switch: The switch that sends messages to the dead letter queue also has routing bindings.

a: Create a dead letter switch and a dead letter queue

 //Declare the dead letter switch
            channel.exchangeDeclare(WORK_NAME,"direct");
            //Declare the dead letter queue
            String queueName="boss_queue";
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,EXCHANGE_Name,"boss");
            String queueName2="waibao_queue";
            channel.queueDeclare(queueName2, false, false, false, null);
            channel.queueBind(queueName2,EXCHANGE_Name,"waibao");

b: Bind a dead letter switch to the queue that needs fault tolerance after failure

 //Declare the switch
        channel.exchangeDeclare(WORK_NAME, "direct");
        Map<String,Object> map=new HashMap<>();
        //Declare the dead letter switch to be bound
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        //Declare the dead letter queue to be bound

        map.put("x-dead-letter-routing-key","waibao_queue");
        //Create message queue
        String queueName="xiaodog_queue";
        channel.queueDeclare(queueName,true,false,false,map);
        channel.queueBind(queueName,WORK_NAME,"xiaodog");
        Map<String,Object> map2=new HashMap<>();
        //Declare the dead letter switch to be bound
        map2.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        map2.put("x-dead-letter-routing-key","boss_queue");
        String queueName2="xiaocat_queue";
        channel.queueDeclare(queueName2,true,false,false,map2);
        channel.queueBind(queueName2,WORK_NAME,"xiaocat");

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 16,793 people are learning the system