Rabbitmq configuration and usage (Fanout broadcast)

Configuration:

Public files:

Create a unified queue configuration class QueueConstant.java

package test.common;
public class QueueConstant {<!-- -->
    /**
     *Queue name
     */
    public static final String RABBITMQ_TEST="rabbitmq.test";
}

Create the switch class ExchangeConstant.java

package test.common;

/**
 *Switch unified configuration class
 * The switch name corresponds to the module name, and routing can trigger specific events for the module. Topic mode is used by default.
 */
public class ExchangeConstant {<!-- -->
    /**
     * Switch name-device
     */
    public static final String EXCHANGE_TEST="test";
    /**
     * Routing key-device-initialization
     */
    public static final String EXCHANGE_KEY_TEST="test.key";

}

Create test class UserDTO .java

import java.io.Serializable;
import lombok.Data;
@Data
public class UserDTO implements Serializable {<!-- -->
    private String username;
    private String password;
}

Producer:

First create a RabbitMQConfig.java as the producer’s configuration class.

package test.producer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Producer message confirmation--"Normal business is closed, important data is opened
 * Enable in configuration file: publisher-returns publisher-confirms
 * ? When the message is sent from producer to exchange, a confirmCallback will be returned.
 * ? If the message delivery from exchange–>queue fails, a returnCallback will be returned.
 */
@Configuration
public class RabbitMQConfig {<!-- -->
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {<!-- -->
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        /**
         * Configure serialization tool
         */
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        template.setMessageConverter(jackson2JsonMessageConverter);
        /**
         * The message is sent from producer to exchange and a confirmCallback is immediately returned
         */
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {<!-- -->
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- -->
                System.out.println("ConfirmCallback: " + "Related data: " + correlationData);
                System.out.println("ConfirmCallback: " + "Confirmation: " + ack);
                System.out.println("ConfirmCallback: " + "Cause: " + cause);
            }
        });
        /**
         * Will be called only after entering the switch but not entering the queue - "routing failure"
         */
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {<!-- -->
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- -->
                System.out.println("ReturnCallback: " + "Message: " + message);
                System.out.println("ReturnCallback: " + "Response code: " + replyCode);
                System.out.println("ReturnCallback: " + "Response information: " + replyText);
                System.out.println("ReturnCallback: " + "Exchange: " + exchange);
                System.out.println("ReturnCallback: " + "Routing Key: " + routingKey);
            }
        });
        return template;
    }
}

yml configuration file

spring:
  rabbitmq:
    host: #URL
    port: #port
    username: #account
    password: #password
    virtualHost: /
    #Switch ack confirmation
    # Correlated-asynchronous simple-synchronous none: Turn off the confirm confirmation mechanism Old version-》publisher-confirms: true
    publisher-confirm-type: none
    #Queue ack confirmation
    publisher-returns: false

Consumer:

Create queue configuration class QueueConfig.java

package test.consumer;

import test.common.ExchangeConstant;
import test.common.QueueConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
/**
 * Queue configuration: queue, switch, binding initialization
 */
@Configuration
public class QueueConfig {<!-- -->
    @Bean
    public TopicExchange DeviceExchange() {<!-- -->
        return new TopicExchange(ExchangeConstant.EXCHANGE_TEST);
    }
   
    @Bean
    public Binding rewardBindingDevice() {<!-- -->
        return BindingBuilder
                .bind(rewardQueue())
                .to(DeviceExchange())
                .with(ExchangeConstant.EXCHANGE_KEY_TEST);
    }
}

Create mq configuration class RabbitMQConfig.java

package test.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;

/**
 * mq configuration class: serialization and deserialization, message confirmation mechanism
 */
@Configuration
public class RabbitMQConfig implements RabbitListenerConfigurer {<!-- -->
    /**
     * Configure monitoring and deserialization tools
     * @param rabbitListenerEndpointRegistrar
     */
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {<!-- -->
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory(){<!-- -->
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(new MappingJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }
    /**
     * Configure sending serialization tool
     * @param connectionFactory
     * @return
     */
    //Set up the serialization tool. By default, jdk's objectStream is used for serial numbers - large size
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {<!-- -->
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    }
}

yml configuration

spring:
  rabbitmq:
    listener:
      simple:
        # Retry mechanism
        retry:
          enabled: true #Whether to enable consumer retry. It is enabled by default and retries infinitely.
          max-attempts: 3 #Maximum number of retries
          initial-interval: 3000ms #Retry interval (in milliseconds)
          max-interval: 600000ms #Maximum retry interval (in milliseconds)
          multiplier: 2 #Interval time multiplier, interval time * multiplier = next interval time, the maximum cannot exceed the set maximum interval time

Use

Producer

 UserDTO userDTO = new UserDTO ();
        userDTO.setUsername("test"); rabbitTemplate.convertAndSend(ExchangeConstant.EXCHANGE_TEST,ExchangeConstant.EXCHANGE_KEY_TEST,userDTO );
        try {<!-- -->
            Thread.sleep(2000);
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }

Create a listening queue class QueueConsumer.java

package test.consumer;

import test.common.userDTO;
import test.common.ExchangeConstant;
import test.common.QueueConstant;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
 * Listening queue
 */
@Slf4j
@Component
public class QueueConsumer {<!-- -->
    @RabbitListener(queues = QueueConstant.RABBITMQ_TEST)
    void listenOrder(Message message){<!-- -->
        log.info(message.toString());
        try {<!-- -->
            switch (message.getMessageProperties().getReceivedRoutingKey())
            {<!-- -->
                case ExchangeConstant.EXCHANGE_KEY_TEST:
                    UserDTO userDTO = new ObjectMapper().readValue(message.getBody(),userDTO.class);
                    log.info(userDTO.toString);
                    break;
                default:
                    break;
            }
        } catch (IOException e) {<!-- -->
            log.error("Deserialization failed",e);
        }
    }
}