202. RabbitMQ uses the fanout type of Exchange to implement the Pub-Sub message model—the fanout type is the broadcast type

Directory

  • Use fanout type Exchange to implement Pub-Sub message model
  • Code demo:
    • Producer: producer
    • Consumer: Consumer01
    • Consumer: Consumer02
    • Test Results
  • Complete code
    • ConnectionUtil
    • Publisher
    • Consumer01
    • Consumer02
    • pom.xml

Use fanout type Exchange to implement the Pub-Sub message model

Just declare a fanout type Exchange to distribute messages. consumers consume
The fanout type is broadcast mode.

The exchange of fanout type does not determine the routing key of the message and directly distributes the message to all queues bound to the exchange.

After a producer sends a message to a fanout type Exchange, all queues bound to the Exchange will receive a copy of the message.
Consumers can also read messages from different queues without interfering with each other.

▲ The fanout type of Exchange can simulate the JMS Pub-Sub message model very well.

Code demo:

They are all modified based on the code in the previous article.
Requirements: Use fanout type Exchange to implement the publish-subscribe function. In fact, it is to create a producer and two consumers to implement broadcast mode message distribution.

Producer: producer

Declare Exchange in the producer, and then declare two message queues Queue,
Then bind these two Queues to this Exchange

Consumer: Consumer01

There is no difference between the codes of the two consumers.
The parameters autoAck of the consumption method are all true, and consumption is automatically confirmed.
Each of the two consumers consumes its own designated message queue.

Consumer: Consumer02


Test results

The consumer producer sends 10 messages, and it is correct that both consumers can consume 10 messages each.

The message producer uses the fanout broadcast type to send messages.

Both consumers can consume 10 messages, correct.

Complete code

ConnectionUtil

package cn.ljh.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//Connection tools
public class ConnectionUtil
{<!-- -->
    //Method to get connection
    public static Connection getConnection() throws IOException, TimeoutException
    {<!-- -->
        //Create a connection factory----This ConnectionFactory source code can see that there is a constructor, so just create a new one.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //Set connection information
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("ljh");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/"); //Connect to the virtual host
        //Get the connection from the connection factory
        Connection connection = connectionFactory.newConnection();
        //return connection
        return connection;
    }
}

Publisher

package cn.ljh.rabbitmq.producer;

import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

//Message producer--using fanout type exchange------is the broadcast mode
public class Publisher
{<!-- -->
    //Constant: define an Exchange name as a constant
    public static final String EXCHANGE_NAME = "myex01.fanout";

    public static void main(String[] args) throws IOException, TimeoutException
    {<!-- -->
        //1. Create connection
        Connection conn = ConnectionUtil.getConnection();
        //2. Get the Channel through Connection.
        Channel channel = conn.createChannel();

        //3. Call the exchangeDeclare() method to declare Exchange, call the queueDeclare() method to declare the queue, and complete the binding of the queue to Exchange.
        channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange name */
                BuiltinExchangeType.FANOUT,/* Exchange type */
                true,/* Whether to persist */
                false,/* Whether to automatically gate off */
                false,/* Is it an internal Exchange */
                null /* Specify additional attributes of Exchange */
        );

        //Declare multiple message queues------Declare the first message queue
        channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null);

        //Bind Exchange and Queue, bind the first message queue
        channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME,
                "" /* Because Exchange is a fanout type, there is no need for a routing key */,
                null /* Specify additional attributes of Exchange */);

        //Declare the second message queue
        channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null);

        //Bind Exchange and Queue, bind the second message queue
        channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME,
                "" /* Because Exchange is a fanout type, there is no need for a routing key */,
                null /* Specify additional attributes of Exchange */);

        //The producer sends 10 messages
        for (int i = 1; i <= 10; i + + )
        {<!-- -->
            String message = "The content of the [ " + i + " ] message sent by the producer";

            //4. Call the basicPublish() method of Channel to send the message
            channel.basicPublish(EXCHANGE_NAME /* Send a message to this fanout type Exchange */,
                    "" /* Because Exchange is a fanout type, it doesn't matter whether there is a routing key or not */,
                    null /*Specify additional message attributes*/,
                    message.getBytes(StandardCharsets.UTF_8)/*The message body must be a byte array type-->byte[]*/
            );
            System.out.println("The producer has completed sending [ " + i + " ] messages");
        }
        //5. Close resources
        //Close channel
        channel.close();
        //Close the connection
        conn.close();
    }
}

Consumer01

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//Message consumer 1
public class Consumer01
{<!-- -->
    //The general steps for developing a message consumer using RabbitMQ Java Client are as follows:
    //(1) Create a ConnectionFactory connection factory, set the connection information, and then obtain the Connection connection through the ConnectionFactory.
    //(2) Get Channel through Connection.
    //(3) As needed, call the queueDeclare() method of Channel to declare the queue, Declare: declare, announce
    // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue.
    //(4) Call the basicConsume() method of Channel to start processing the message. When calling this method, you need to pass in a Consumer parameter, which is equivalent to the message listener in JMS.

    //constant
    public final static String QUEUE01 = "firstQueue";

    public static void main(String[] args) throws IOException, TimeoutException
    {<!-- -->
        //1. Create a connection factory, set the connection information, and then obtain the connection through the connection factory.
        Connection conn = ConnectionUtil.getConnection();

        //2. Get the Channel message channel through Connection
        Channel channel = conn.createChannel();

        //3. Call the queueDeclare() method of Channel to declare the queue.
        // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue
        channel.queueDeclare(QUEUE01, /* declared queue name */
                true, /* Whether the message queue is persistent */
                false, /* Whether to only allow this message consumer to consume messages from this queue, exclusively */
                false, /* Whether to delete automatically */
                null /* Specify additional attributes of the message queue */);


        //4. Call the basicConsume() method of Channel to start processing consumption messages
        channel.basicConsume(
                QUEUE01 /*Consume messages in this consumption queue*/,
                true /*Message confirmation mode: whether to automatically confirm that the message has been consumed and return a confirmation message to the message queue*/,
                new DefaultConsumer(channel)
                {<!-- -->
                    //Processing messages: When this message queue receives a message, this method will be triggered. Override this method:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*The envelope where the message is located, the exchange and routing keys that store the message*/,
                                               AMQP.BasicProperties properties /*Those properties of the message*/,
                                               byte[] body /*body: message body of the message*/) throws IOException
                    {<!-- -->
                        //Get the message in the message body
                        String message = new String(body, "UTF-8");
                        //printf: formatted output function %s: output string %n: newline
                        System.err.printf("P2PConsumer received a message from Exchange [%s] and routing key [%s], and the message content is %s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);

                    }
                }
        );
    }
}

Consumer02

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//Message consumer 2
public class Consumer02
{<!-- -->
    //The general steps for developing a message consumer using RabbitMQ Java Client are as follows:
    //(1) Create a ConnectionFactory connection factory, set the connection information, and then obtain the Connection connection through the ConnectionFactory.
    //(2) Get Channel through Connection.
    //(3) As needed, call the queueDeclare() method of Channel to declare the queue, Declare: declare, announce
    // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue.
    //(4) Call the basicConsume() method of Channel to start processing the message. When calling this method, you need to pass in a Consumer parameter, which is equivalent to the message listener in JMS.

    //constant
    public final static String QUEUE02 = "secondQueue";

    public static void main(String[] args) throws IOException, TimeoutException
    {<!-- -->
        //1. Create a connection factory, set the connection information, and then obtain the connection through the connection factory.
        Connection conn = ConnectionUtil.getConnection();

        //2. Get the Channel message channel through Connection
        Channel channel = conn.createChannel();

        //3. Call the queueDeclare() method of Channel to declare the queue.
        // If the declared queue already exists, this method directly obtains the existing queue; if the declared queue does not yet exist, this method will create a new queue
        channel.queueDeclare(QUEUE02, /* declared queue name */
                true, /* Whether the message queue is persistent */
                false, /* Whether to only allow this message consumer to consume messages from this queue, exclusively */
                false, /* Whether to delete automatically */
                null /* Specify additional attributes of the message queue */);

        //4. Call the basicConsume() method of Channel to start processing consumption messages
        channel.basicConsume(
                QUEUE02 /*Consume messages in the consumption queue with this name*/,
                true/*Message confirmation mode: whether to automatically confirm that the message has been consumed and return a confirmation message to the message queue*/,
                new DefaultConsumer(channel)
                {<!-- -->
                    //Processing messages: When this message queue receives a message, this method will be triggered. Override this method:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*The envelope where the message is located, the exchange and routing keys that store the message*/,
                                               AMQP.BasicProperties properties /*Those properties of the message*/,
                                               byte[] body /*body: message body of the message*/) throws IOException
                    {<!-- -->
                        //Get the message in the message body
                        String message = new String(body, "UTF-8");
                        //printf: formatted output function %s: output string %n: newline
                        System.err.printf("P2PConsumer received a message from Exchange [%s] and routing key [%s], and the message content is %s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);
                    }
                }
        );
    }


}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ljh</groupId>
    <artifactId>rabbitmq_fanout</artifactId>
    <version>1.0.0</version>
    <name>rabbitmq_fanout</name>

    <!-- Properties -->
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
    </properties>

    <!-- Dependencies -->
    <dependencies>
        <!-- RabbitMQ dependent libraries -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.0</version>
        </dependency>

    </dependencies>


</project>