Directory
- Use fanout type Exchange to implement Pub-Sub message model
- Code demo:
-
- Producer: producer
- Consumer: Consumer01
- Consumer: Consumer02
- Test Results
- Complete code
-
- ConnectionUtil
- Publisher
- Consumer01
- Consumer02
- pom.xml
Use fanout type Exchange to implement the Pub-Sub message model
Just declare a fanout type Exchange to distribute messages. consumers consume
The fanout type is broadcast mode.
The exchange of fanout type does not determine the routing key of the message and directly distributes the message to all queues bound to the exchange.
After a producer sends a message to a fanout type Exchange, all queues bound to the Exchange will receive a copy of the message.
Consumers can also read messages from different queues without interfering with each other.
▲ The fanout type of Exchange can simulate the JMS Pub-Sub message model very well.
Code demo:
They are all modified based on the code in the previous article.
Requirements: Use fanout type Exchange to implement the publish-subscribe function. In fact, it is to create a producer and two consumers to implement broadcast mode message distribution.
Producer: producer
Declare Exchange in the producer, and then declare two message queues Queue,
Then bind these two Queues to this Exchange
Consumer: Consumer01
There is no difference between the codes of the two consumers.
The parameters autoAck of the consumption method are all true, and consumption is automatically confirmed.
Each of the two consumers consumes its own designated message queue.
Consumer: Consumer02
Test results
The consumer producer sends 10 messages, and it is correct that both consumers can consume 10 messages each.
The message producer uses the fanout broadcast type to send messages.
Both consumers can consume 10 messages, correct.
Complete code
ConnectionUtil
package cn.ljh.rabbitmq.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; //Connection tools public class ConnectionUtil {<!-- --> //Method to get connection public static Connection getConnection() throws IOException, TimeoutException {<!-- --> //Create a connection factory----This ConnectionFactory source code can see that there is a constructor, so just create a new one. ConnectionFactory connectionFactory = new ConnectionFactory(); //Set connection information connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("ljh"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/"); //Connect to the virtual host //Get the connection from the connection factory Connection connection = connectionFactory.newConnection(); //return connection return connection; } }
Publisher
package cn.ljh.rabbitmq.producer; import cn.ljh.rabbitmq.consumer.Consumer01; import cn.ljh.rabbitmq.consumer.Consumer02; import cn.ljh.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; //Message producer--using fanout type exchange------is the broadcast mode public class Publisher {<!-- --> //Constant: define an Exchange name as a constant public static final String EXCHANGE_NAME = "myex01.fanout"; public static void main(String[] args) throws IOException, TimeoutException {<!-- --> //1. Create connection Connection conn = ConnectionUtil.getConnection(); //2. Get the Channel through Connection. Channel channel = conn.createChannel(); //3. Call the exchangeDeclare() method to declare Exchange, call the queueDeclare() method to declare the queue, and complete the binding of the queue to Exchange. channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange name */ BuiltinExchangeType.FANOUT,/* Exchange type */ true,/* Whether to persist */ false,/* Whether to automatically gate off */ false,/* Is it an internal Exchange */ null /* Specify additional attributes of Exchange */ ); //Declare multiple message queues------Declare the first message queue channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null); //Bind Exchange and Queue, bind the first message queue channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME, "" /* Because Exchange is a fanout type, there is no need for a routing key */, null /* Specify additional attributes of Exchange */); //Declare the second message queue channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null); //Bind Exchange and Queue, bind the second message queue channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME, "" /* Because Exchange is a fanout type, there is no need for a routing key */, null /* Specify additional attributes of Exchange */); //The producer sends 10 messages for (int i = 1; i <= 10; i + + ) {<!-- --> String message = "The content of the [ " + i + " ] message sent by the producer"; //4. Call the basicPublish() method of Channel to send the message channel.basicPublish(EXCHANGE_NAME /* Send a message to this fanout type Exchange */, "" /* Because Exchange is a fanout type, it doesn't matter whether there is a routing key or not */, null /*Specify additional message attributes*/, message.getBytes(StandardCharsets.UTF_8)/*The message body must be a byte array type-->byte[]*/ ); System.out.println("The producer has completed sending [ " + i + " ] messages"); } //5. Close resources //Close channel channel.close(); //Close the connection conn.close(); } }
Consumer01
package cn.ljh.rabbitmq.consumer; import cn.ljh.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //Message consumer 1 public class Consumer01 {<!-- --> //The general steps for developing a message consumer using RabbitMQ Java Client are as follows: //(1) Create a ConnectionFactory connection factory, set the connection information, and then obtain the Connection connection through the ConnectionFactory. //(2) Get Channel through Connection. //(3) As needed, call the queueDeclare() method of Channel to declare the queue, Declare: declare, announce // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue. //(4) Call the basicConsume() method of Channel to start processing the message. When calling this method, you need to pass in a Consumer parameter, which is equivalent to the message listener in JMS. //constant public final static String QUEUE01 = "firstQueue"; public static void main(String[] args) throws IOException, TimeoutException {<!-- --> //1. Create a connection factory, set the connection information, and then obtain the connection through the connection factory. Connection conn = ConnectionUtil.getConnection(); //2. Get the Channel message channel through Connection Channel channel = conn.createChannel(); //3. Call the queueDeclare() method of Channel to declare the queue. // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue channel.queueDeclare(QUEUE01, /* declared queue name */ true, /* Whether the message queue is persistent */ false, /* Whether to only allow this message consumer to consume messages from this queue, exclusively */ false, /* Whether to delete automatically */ null /* Specify additional attributes of the message queue */); //4. Call the basicConsume() method of Channel to start processing consumption messages channel.basicConsume( QUEUE01 /*Consume messages in this consumption queue*/, true /*Message confirmation mode: whether to automatically confirm that the message has been consumed and return a confirmation message to the message queue*/, new DefaultConsumer(channel) {<!-- --> //Processing messages: When this message queue receives a message, this method will be triggered. Override this method: @Override public void handleDelivery(String consumerTag, Envelope envelope /*The envelope where the message is located, the exchange and routing keys that store the message*/, AMQP.BasicProperties properties /*Those properties of the message*/, byte[] body /*body: message body of the message*/) throws IOException {<!-- --> //Get the message in the message body String message = new String(body, "UTF-8"); //printf: formatted output function %s: output string %n: newline System.err.printf("P2PConsumer received a message from Exchange [%s] and routing key [%s], and the message content is %s%n", envelope.getExchange(),envelope.getRoutingKey(),message); } } ); } }
Consumer02
package cn.ljh.rabbitmq.consumer; import cn.ljh.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //Message consumer 2 public class Consumer02 {<!-- --> //The general steps for developing a message consumer using RabbitMQ Java Client are as follows: //(1) Create a ConnectionFactory connection factory, set the connection information, and then obtain the Connection connection through the ConnectionFactory. //(2) Get Channel through Connection. //(3) As needed, call the queueDeclare() method of Channel to declare the queue, Declare: declare, announce // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue. //(4) Call the basicConsume() method of Channel to start processing the message. When calling this method, you need to pass in a Consumer parameter, which is equivalent to the message listener in JMS. //constant public final static String QUEUE02 = "secondQueue"; public static void main(String[] args) throws IOException, TimeoutException {<!-- --> //1. Create a connection factory, set the connection information, and then obtain the connection through the connection factory. Connection conn = ConnectionUtil.getConnection(); //2. Get the Channel message channel through Connection Channel channel = conn.createChannel(); //3. Call the queueDeclare() method of Channel to declare the queue. // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue channel.queueDeclare(QUEUE02, /* declared queue name */ true, /* Whether the message queue is persistent */ false, /* Whether to only allow this message consumer to consume messages from this queue, exclusively */ false, /* Whether to delete automatically */ null /* Specify additional attributes of the message queue */); //4. Call the basicConsume() method of Channel to start processing consumption messages channel.basicConsume( QUEUE02 /*Consume messages in the consumption queue with this name*/, true/*Message confirmation mode: whether to automatically confirm that the message has been consumed and return a confirmation message to the message queue*/, new DefaultConsumer(channel) {<!-- --> //Processing messages: When this message queue receives a message, this method will be triggered. Override this method: @Override public void handleDelivery(String consumerTag, Envelope envelope /*The envelope where the message is located, the exchange and routing keys that store the message*/, AMQP.BasicProperties properties /*Those properties of the message*/, byte[] body /*body: message body of the message*/) throws IOException {<!-- --> //Get the message in the message body String message = new String(body, "UTF-8"); //printf: formatted output function %s: output string %n: newline System.err.printf("P2PConsumer received a message from Exchange [%s] and routing key [%s], and the message content is %s%n", envelope.getExchange(),envelope.getRoutingKey(),message); } } ); } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.ljh</groupId> <artifactId>rabbitmq_fanout</artifactId> <version>1.0.0</version> <name>rabbitmq_fanout</name> <!-- Properties --> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>11</java.version> </properties> <!-- Dependencies --> <dependencies> <!-- RabbitMQ dependent libraries --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.13.0</version> </dependency> </dependencies> </project>