RabbitMQ restarts abnormally, some consumption queues do not consume

[2021-03-04 094735.986][,][INFO][SimpleAsyncTaskExecutor-1][SimpleMessageListenerContainer$AsyncMessageProcessingConsumerrun1212] Restarting Consumer@504a9352 tags=[{<!-- -->amq.ctag-WWlW_DAeVfp 570-uQvvXDg= queue1}], channel=Cached Rabbit Channel AMQChannel([email protected],4), conn Proxy@3920f51 Shared Rabbit Connection null, acknowledgeMode=MANUAL local queue size=0
[2021-03-04 094735.987][,][INFO][SimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
[2021-03-04 094735.989][,][ERROR][SimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).
org.springframework.amqp.AmqpConnectException java.net.ConnectException Connection refused (Connection refused)
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java62) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java484) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java626) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java240) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java1797) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1771) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1752) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java345) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java1604) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java995) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
at java.lang.Thread.run(Thread.java748) [1.8.0_275]
Caused by java.net.ConnectException Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[1.8.0_275]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java350) ~[1.8.0_275]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java206) ~[1.8.0_275]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java188) ~[1.8.0_275]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java392) ~[1.8.0_275]
at java.net.Socket.connect(Socket.java607) ~[1.8.0_275]
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java60) ~[amqp-client-5.1.2.jar5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java955) ~[amqp-client-5.1.2.jar5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java907) ~[amqp-client-5.1.2.jar5.1.2]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java1066) ~[amqp-client-5.1.2.jar5.1.2]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java466) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
... 9 more
[2021-03-04 094736.262][,][INFO][iccpaasSimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
[2021-03-04 094736.264][,][ERROR][iccpaasSimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).

Reason: The consumer did not continue to restart the consumer thread because of this exception.

solution:

  1. You can listen to the ListenerContainerConsumerFailedEvent event, which is defined as follows: There is an attribute fatal, and when fatal is true, it means that the consumer has a fatal error. At this time, the consumer will not automatically retry to restart, and we need to be in the event processing logic Do a reboot. When fatal is false, we can ignore this event, and the consumer container will automatically retry to start.

    public class ListenerContainerConsumerFailedEvent extends AmqpEvent {<!-- -->
    private final String reason;
    private final boolean fatal;
    private final Throwable throwable;
    }
    

    Processing logic code: When judging that the fatal of the event is true, first judge whether the container is running, if it is not running, call start to start, and then send an alarm message.

    import java.util.Arrays;
    import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.ApplicationListener;
    import org.springframework.stereotype.Component;
    import org.springframework.util.Assert;
    import lombok.extern.slf4j.Slf4j;
     
    @Slf4j
    @Component
    public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {<!-- -->
     
        @Override
        public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {<!-- -->
            log.error("Consumer failure event occurred: {}", event);
            if (event. isFatal()) {<!-- -->
                log.error("Stopping container from aborted consumer. Reason::{}", event. getReason(), event. getThrowable());
                SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event. getSource();
                String queueNames = Arrays.toString(container.getQueueNames());
                try {<!-- -->
                    try {<!-- -->
                        Thread. sleep(30000);
                    } catch (Exception e) {<!-- -->
                        log. error(e. getMessage());
                    }
                    //Determine whether the consumer container is running at this time
                    Assert.state(!container.isRunning(), String.format("The monitoring container %s is running!", container));
                    //Consumer container is not running, start it
                    container. start();
                    log.info("Restart the monitoring of queue {} successfully", queueNames);
                } catch (Exception e) {<!-- -->
                    log.error("Restarting the listener of queue {} failed", queueNames, e);
                }
                // TODO SMS/Email/DingTalk...alarm, including queue information, monitoring disconnection reason, abnormal information when disconnecting, whether the restart is successful, etc...
            }
        }
    }
    
  2. Change missingQueuesFatal to false, you can not change the value of aborted when QueuesNotAvailableException is thrown, so that the restart method will be automatically called in the killOrRestart method, but this processing method is limited to QueuesNotAvailableException exception, unlike the above processing method has Versatility.

    # That is, continue to restart the consumer thread infinitely no matter what the exception is
    spring.rabbitmq.listener.simple.missing-queues-fatal=false