rabbimq’s java.net.SocketException: Analysis of Connection reset and MissedHeartbeatException

1. Foreword

The rabbitmq message queue is connected to the android front-end to handle business. The mobile network environment is complex and the network signal is unstable, which may cause frequent disconnection and connection of mq. In the log, it is found that there are many such logs, java.net. SocketException: Connection reset. Next, we will analyze the possible causes of this error through source code debugging. MissedHeartbeatException is thrown when the client thinks that the server has been disconnected after not receiving messages from the server for many times.

2. Analysis

java.net.SocketException: Connection reset I searched around the Internet. Basically, it means that the client is connected to mq, but the server has disconnected from the client. At this time, the client is still performing the operation of receiving data, and it will This error occurred.

3. MQ’s heartbeat mechanism

When MQ creates a connection, it will initialize and start the heartbeat service initializeHeartbeatSender();

 private void initializeHeartbeatSender() {
        this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);
    }

In rabbitmq, the client will send heartbeats regularly at intervals of 1/2 the heartbeat cycle.

 /**
     * Sets the heartbeat in seconds.
     */
    public void setHeartbeat(int heartbeatSeconds) {<!-- -->
        synchronized(this.monitor) {<!-- -->
            if(this.shutdown) {<!-- -->
                return;
            }

            // cancel any existing heartbeat task
            if(this.future != null) {<!-- -->
                this.future.cancel(true);
                this.future = null;
            }

            if (heartbeatSeconds > 0) {<!-- -->
                // wake every heartbeatSeconds / 2 to avoid the worst case
                // where the last activity comes just after the last heartbeat
                long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
                ScheduledExecutorService executor = createExecutorIfNecessary();
                Runnable task = new HeartbeatRunnable(interval);
                this.future = executor.scheduleAtFixedRate(
                    task, interval, interval, TimeUnit.NANOSECONDS);
            }
        }
    }

Send a heartbeat. If an IO exception occurs at this time, it will not be processed here.

 private final class HeartbeatRunnable implements Runnable {<!-- -->

        private final long heartbeatNanos;

        private HeartbeatRunnable(long heartbeatNanos) {<!-- -->
            this.heartbeatNanos = heartbeatNanos;
        }

        @Override
        public void run() {<!-- -->
            try {<!-- -->
                LogUtils.log("Heartbeat timer sent");
                long now = System.nanoTime();

                if (now > (lastActivityTime + this.heartbeatNanos)) {<!-- -->
                    frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                    frameHandler.flush();
                }
            } catch (IOException e) {<!-- -->
                // ignore
            }
        }
    }

Combining official documentation and client source code, the default heartbeat timeout is 60 seconds, and a heartbeat check is performed every 30 seconds. If the node check is not determined for more than two heartbeat checks, the connection will be closed.

3.1 Test

In the test case, set the heartbeat cycle to 30 seconds

public static void main(String[] args) {<!-- -->
        String queueName="123456";
        ExecutorService executor= Executors.newFixedThreadPool(10);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.0.11.211");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setVirtualHost("/");
        factory.setPassword("admin");
        factory.setConnectionTimeout(5000);
        factory.setAutomaticRecoveryEnabled(false);
        factory.setTopologyRecoveryEnabled(false);
        factory.setRequestedHeartbeat(30);
        executor.submit(() -> {<!-- -->
            try {<!-- -->
                Connection connection = factory.newConnection();
                LogUtils.log("Connection created successfully");
                connection.addShutdownListener(cause -> {<!-- -->
                    LogUtils.log("Disconnected: " + cause.getMessage() + " msg=>:" + cause.getCause());
                });
                Channel channel = connection.createChannel();
                LogUtils.log("Channel created successfully:" + channel.getChannelNumber());
                channel.basicQos(30);
                channel.basicConsume(queueName, false, new DefaultConsumer(channel) {<!-- -->
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {<!-- -->
                        try {<!-- -->
                            String message = new String(body, "UTF-8");
                            LogUtils.log("Message: " + message);
                            channel.basicReject(envelope.getDeliveryTag(), false);
                        } catch (Exception e) {<!-- -->
                            LogUtils.log("Consumer exception,e:" + e.getMessage() + " consumerTag:" + consumerTag);
                        }
                    }
                });
                channel.addShutdownListener(cause -> {<!-- -->
                    LogUtils.log("Consumer disconnected: " + cause.getMessage() + " msg=>:" + cause.getCause().toString());
                });
            } catch (Exception e) {<!-- -->
                LogUtils.log("Exception occurred: " + e);
                e.printStackTrace();
            }
        });
    }

Then close the heartbeat sending service

 private final class HeartbeatRunnable implements Runnable {<!-- -->

        private final long heartbeatNanos;

        private HeartbeatRunnable(long heartbeatNanos) {<!-- -->
            this.heartbeatNanos = heartbeatNanos;
        }

        @Override
        public void run() {<!-- -->
            try {<!-- -->
                LogUtils.log("Heartbeat timer sent");
                long now = System.nanoTime();

                if (now > (lastActivityTime + this.heartbeatNanos)) {<!-- -->
                  // frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                  // frameHandler.flush();
                }
            } catch (IOException e) {<!-- -->
                // ignore
            }
        }
    }

After running, as follows:

2023-09-25 09:35:02.976=>Connection created successfully
2023-09-25 09:35:02.987=>Channel created successfully: 1
2023-09-25 09:35:17.948=>Heartbeat timer sent
2023-09-25 09:35:32.948=>Heartbeat timer sent
2023-09-25 09:35:47.949=>Heartbeat timer sent
2023-09-25 09:36:02.949=>Heartbeat timer sent
2023-09-25 09:36:17.948=>Heartbeat timer sent
2023-09-25 09:36:32.948=>Heartbeat timer sent
2023-09-25 09:36:32.960=>Consumer disconnected: connection error msg=>:java.net.SocketException: Connection reset
2023-09-25 09:36:32.962=>Disconnected: connection error msg=>:java.net.SocketException: Connection reset

The result analysis shows that after the server does not detect the client’s heartbeat for 3 heartbeat cycles, it will assume that the client is disconnected and disconnect it.

4. Analysis of MissedHeartbeatException

After the client successfully connects to MQ, it starts reading the data service this._frameHandler.initialize(this);

 private void startIoLoops() {<!-- -->
        if (executorService == null) {<!-- -->
            Thread nioThread = Environment.newThread(
                threadFactory,
                new NioLoop(socketChannelFrameHandlerFactory.nioParams, this),
                "rabbitmq-nio"
            );
            nioThread.start();
        } else {<!-- -->
            this.executorService.submit(new NioLoop(socketChannelFrameHandlerFactory.nioParams, this));
        }
    }

Read the thread business method. If the frame is not empty, the lost heartbeat will be reset to 0 times. Otherwise, the number of lost heartbeats will be counted.

private void readFrame(Frame frame) throws IOException {<!-- -->
        LogUtils.log("Start reading data");
        if (frame != null) {<!-- -->
            _missedHeartbeats = 0;
            if (frame.getType() == AMQP.FRAME_HEARTBEAT) {<!-- -->
                LogUtils.log("Read data: heartbeat");
            } else {<!-- -->
                if (frame.getChannel() == 0) {<!-- --> // the special channel
                    _channel0.handleFrame(frame);
                } else {<!-- -->
                    if (isOpen()) {<!-- -->
                        ChannelManager cm = _channelManager;
                        if (cm != null) {<!-- -->
                            ChannelN channel;
                            try {<!-- -->
                                channel = cm.getChannel(frame.getChannel());
                            } catch(UnknownChannelException e) {<!-- -->
                                LOGGER.info("Received a frame on an unknown channel, ignoring it");
                                return;
                            }
                            channel.handleFrame(frame);
                        }
                    }
                }
            }
        } else {<!-- -->
        LogUtils.log("Start reading data frame is empty");
            handleSocketTimeout();
        }
    }

Timeout mechanism, if you enter this service, _missedHeartbeats will automatically increase by 1. If it exceeds a certain number, a MissedHeartbeatException will be thrown.

 private void handleSocketTimeout() throws SocketTimeoutException {<!-- -->
        if (_inConnectionNegotiation) {<!-- -->
            throw new SocketTimeoutException("Timeout during Connection negotiation");
        }
        if (_heartbeat == 0) {<!-- --> // No heart-beating
            return;
        }
        LogUtils.log("handleSocketTimeout-------_missedHeartbeatsheartbeat:" + _missedHeartbeats);
        if ( + + _missedHeartbeats > (1)) {<!-- -->
            throw new MissedHeartbeatException("Heartbeat missing with heartbeat = " +
                                               _heartbeat + " seconds, for " + this.getHostAddress());
        }
    }

In order to facilitate testing, set the heartbeat to 10s, and if _missedHeartbeats is greater than 1, a MissedHeartbeatException will be thrown.

4.1 Test
2023-09-25 10:21:16.565=>Start reading data
2023-09-25 10:21:16.651=>Start reading data
2023-09-25 10:21:16.658=>Start reading data
2023-09-25 10:21:16.658=>Connection created successfully
2023-09-25 10:21:16.669=>Start reading data
2023-09-25 10:21:16.670=>Channel created successfully: 1
2023-09-25 10:21:16.671=>Start reading data
2023-09-25 10:21:16.675=>Start reading data
2023-09-25 10:21:19.177=>Start reading data
2023-09-25 10:21:19.177=>Start reading data frame is empty
2023-09-25 10:21:19.177=>handleSocketTimeout-------_missedHeartbeatsHeartbeats: 0
2023-09-25 10:21:21.659=>Start reading data
2023-09-25 10:21:21.659=>Read data: heartbeat
2023-09-25 10:21:24.160=>Start reading data
2023-09-25 10:21:24.160=>Start reading data frame is empty
2023-09-25 10:21:24.161=>handleSocketTimeout-------_missedHeartbeatsHeartbeats: 0
2023-09-25 10:21:26.659=>Start reading data
2023-09-25 10:21:26.660=>Read data: heartbeat
2023-09-25 10:21:29.161=>Start reading data
2023-09-25 10:21:29.161=>Start reading data frame is empty
2023-09-25 10:21:29.161=>handleSocketTimeout-------_missedHeartbeatsHeartbeats: 0
2023-09-25 10:21:31.661=>Start reading data
2023-09-25 10:21:31.661=>Read data: heartbeat
2023-09-25 10:21:34.161=>Start reading data
2023-09-25 10:21:34.161=>Start reading data frame is empty
2023-09-25 10:21:34.161=>handleSocketTimeout-------_missedHeartbeatsHeartbeats: 0
2023-09-25 10:21:36.662=>Start reading data
2023-09-25 10:21:36.662=>Start reading data frame is empty
2023-09-25 10:21:36.662=>handleSocketTimeout-------_missedHeartbeatsHeartbeats: 1
2023-09-25 10:21:36.668=>Consumer disconnected: connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211
2023-09-25 10:21:36.671=>Disconnected: connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211

As can be seen from the above, the server will send a heartbeat to the client during the heartbeat cycle. If the MissedHeartbeatException exceeds a certain number of times during the period when the client does not receive any message, the exception will be thrown. The official default is 2*4=8