Table of Contents
1: Switch
1:Direct switch
1.1 Producer-side code:
1.2: Consumer code:
2:Topic theme switch
2.1: Producer code:
2.2: Consumer code:
2: Core features
2.1: Message expiration mechanism
2.1.1: Specify expiration time for all messages in the queue
2.1.2: Specify expiration time for a message
2.2: Dead letter queue
one:switch
1:Direct switch
Binding: Associate the switch with the queue, and you can specify what kind of messages the switch sends to the queue.
rountingkey: Routing key, controls which queue the message is sent to.
Features: Specify the queue to be forwarded to based on the routing key
Scenario:Specific messages are assigned to specific queues
1.1 producer code:
We specify who completes the task by entering messages and routing through the console.
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; public class DirectProducer { private static final String EXCHANGE_NAME = "2"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //Create the name of the switch channel.exchangeDeclare(EXCHANGE_NAME, "direct"); Scanner scanner=new Scanner(System.in); while(scanner.hasNext()){ String userInput=scanner.nextLine(); String[] s = userInput.split(" "); if(s.length<1){ continue; } //Specify routing key String message=s[0]; String routingKey=s[1]; //make an announcement /* The first parameter: which switch to publish to Second parameter: routing key */ channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8")); System.out.println("[x] Sent" + message + "with rounting" + routingKey + " "); } } } //.. }
1.2:Consumer-side code:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; public class DirectProducer { private static final String EXCHANGE_NAME = "2"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //Create the name of the switch channel.exchangeDeclare(EXCHANGE_NAME, "direct"); Scanner scanner=new Scanner(System.in); while(scanner.hasNext()){ String userInput=scanner.nextLine(); String[] s = userInput.split(" "); if(s.length<1){ continue; } //Specify routing key String message=s[0]; String routingKey=s[1]; //make an announcement /* The first parameter: which switch to publish to Second parameter: routing key */ channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8")); System.out.println("[x] Sent" + message + "with rounting" + routingKey + " "); } } } //.. }
operation result:
2:Topic theme switch
Features: Messages are forwarded to specified queues based on an ambiguous routing key.
Scenario: A specific type of message is only handed over to a specific type of system (processed by a program).
Binding relationship:Fuzzy matching message queue *: Match one word #: Match 0 or more words
2.1:Producer code:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; public class TopicProducer { private static final String EXCHANGE_NAME = "3"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); Scanner scanner=new Scanner(System.in); while(scanner.hasNext()){ String userInput=scanner.nextLine(); String[] s = userInput.split(" "); if(s.length<1){ continue; } //Specify routing key String message=s[0]; String routingKey=s[1]; //make an announcement /* The first parameter: which switch to publish to Second parameter: routing key */ channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8")); System.out.println("[x] Sent" + message + "with rounting" + routingKey + " "); } } } }
2.2:Consumer code:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class TopicConsumer { private static final String EXCHANGE_NAME = "3"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //Create message queue String queueName="fronted_queue"; channel.queueDeclare(queueName,true,false,false,null); channel.queueBind(queueName,EXCHANGE_NAME,"#.Frontend.#"); String queueName2="backed-_queue"; channel.queueDeclare(queueName2,true,false,false,null); channel.queueBind(queueName2,EXCHANGE_NAME,"#.Backend.#"); String queueName3="product_queue"; channel.queueDeclare(queueName3,true,false,false,null); channel.queueBind(queueName3,EXCHANGE_NAME,"#.product.#"); System.out.println(" [*] Waiting for messages. To exit press CTRL + C"); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [front end] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [Backend] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; DeliverCallback deliverCallback3 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [Product] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { }); channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); channel.basicConsume(queueName3, true, deliverCallback3, consumerTag -> { }); } }
operation result:
2: Core Features
2.1: Message expiration mechanism
Features: Specify a validity period for each message. If it is not consumed within a period of time, it will expire.
2.1.1: Specify expiration time for all messages in the queue
In the consumer, specify an expiration time for all messages in the queue. If no consumer has retrieved the message within the expiration time, the message will expire. If the message has been received but not confirmed, it will not expire.
public class TTLConsumer { private final static String QUEUE_NAME = "ttl_queue"; public static void main(String[] argv) throws Exception { //Create connection factory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //Create channel and provide communication Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //Specify the expiration time of the message queue Map<String,Object> args=new HashMap<>(); args.put("x-message-ttl",5000); //args: Specify parameters channel.queueDeclare(QUEUE_NAME, false, false,false, args); System.out.println(" [*] Waiting for messages. To exit press CTRL + C"); //How to handle the message DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
2.1.2: Specify the expiration time for a certain message
//Set the expiration time on the sender's side public class TTLProducer { private final static String QUEUE_NAME = "ttl_queue"; public static void main(String[] argv) throws Exception { //Create connection factory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); //The channel is equivalent to the client (jdbcClient, redisClient), which provides communication with the dequeue server. The program sends messages through the channel. Channel channel = connection.createChannel()) { //Create a message queue, the second parameter (durable): whether to enable persistence, the third parameter exclusiove: whether to allow the current creation of the message queue //Connection operation message queue The fourth parameter: No one is using the queue, does it need to be deleted? String message = "Hello World!"; //Specify expiration time for the message AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder() .expiration("1000") .build(); channel.basicPublish("", QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } }
2.2:Dead Letter Queue
In order to ensure the reliability of messages, for example, each message is successfully consumed, a fault-tolerant mechanism needs to be provided, that is, how to handle failed messages, which is equivalent to a dead letter.
Dead letter: Expired messages, rejected messages, messages that failed to be processed, and the message queue is full are collectively called dead letters.
Dead letter queue: Queue for processing dead letters.
Dead letter switch: The switch that sends messages to the dead letter queue also has routing bindings.
a: Create a dead letter switch and a dead letter queue
//Declare the dead letter switch channel.exchangeDeclare(WORK_NAME,"direct"); //Declare the dead letter queue String queueName="boss_queue"; channel.queueDeclare(queueName,true,false,false,null); channel.queueBind(queueName,EXCHANGE_Name,"boss"); String queueName2="waibao_queue"; channel.queueDeclare(queueName2, false, false, false, null); channel.queueBind(queueName2,EXCHANGE_Name,"waibao");
b: Bind a dead letter switch to the queue that needs fault tolerance after failure
//Declare the switch channel.exchangeDeclare(WORK_NAME, "direct"); Map<String,Object> map=new HashMap<>(); //Declare the dead letter switch to be bound map.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); //Declare the dead letter queue to be bound map.put("x-dead-letter-routing-key","waibao_queue"); //Create message queue String queueName="xiaodog_queue"; channel.queueDeclare(queueName,true,false,false,map); channel.queueBind(queueName,WORK_NAME,"xiaodog"); Map<String,Object> map2=new HashMap<>(); //Declare the dead letter switch to be bound map2.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); map2.put("x-dead-letter-routing-key","boss_queue"); String queueName2="xiaocat_queue"; channel.queueDeclare(queueName2,true,false,false,map2); channel.queueBind(queueName2,WORK_NAME,"xiaocat");
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 16,793 people are learning the system