Kafka broker shrink in Flink causes Task to keep restarting

Background

Flink version 1.12.2
Kafka client 2.4.1
A streaming program that reads Kafka and calculates DAU is run on the company’s Flink platform. Due to the shrinkage of the company’s Kafka, the program has been restarted, and it has not recovered even after an hour of restarting (the specific operation is down) Four kafka brokers, and flink was configured with 12 kafka brokers at that time). The specific scene at that time was as follows:

The logs on JobManaer are as follows:
2023-10-07 10:02:52.975 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, ubt_start, watermark=[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)] ]]) (34/64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 @ task-xxxx-shanghai.emr.aliyuncs.com (dataPort=xxxx).
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null
        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)
        at java.lang.Thread.run(Thread.java:750)


The logs on the corresponding TaskManager (task-xxxx-shanghai.emr.aliyuncs.com) are as follows:

2023-10-07 10:02:24.604 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya. local/192.168.129.46:9092) could not be established. Broker may not be available.


2023-10-07 10:02:52.939 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null
        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)
        at java.lang.Thread.run(Thread.java:750)

2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available.
2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected
2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available.
2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected


2023-10-07 10:08:15.541 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

At that time, the relevant configuration of kafka source in Flink was as follows:

scan.topic-partition-discovery.interval 300000
restart-strategy.type fixed-delay
restart-strategy.fixed-delay.attempts 50000000
jobmanager.execution.failover-strategy region

Conclusion and solution

Currently, there are two parameters on the kafka consumer side: default.api.timeout.ms (default 60000) and request.timeout.ms (default 30000). These two parameters come from Control the kakfa client’s request timeout from the server, which means that the timeout for each request is 30s (of course not the timeout for one request to the broker, see other details later). You can try again after the timeout. If there is no request within 60s If any response is received, TimeOutException will be reported. For details, see the following analysis.
We solve this problem by setting the following parameters in the flink kafka connector:

`properties.default.api.timeout.ms` = '600000',
`properties.request.timeout.ms` = '5000',
// max.block.ms is to set the timeout of kafka producer
`properties.max.block.ms` = '600000',

Analysis

In Flink, the DynamicTableSourceFactory for Kafka’s Connector is KafkaDynamicTableFactory. Here we only discuss the case where kafka is used as the source.
The method of this class createDynamicTableSource will eventually be called. As for the specific calling chain, you can refer to Apache Hudi Preliminary Study (4) (Combined with Flink) – How Hudi’s createDynamicTableSource/createDynamicTableSink/ is used in Flink Sql Call – just change Sink to Source, so you will end up in the KafkaDynamicSource class:

@Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        final DeserializationSchema<RowData> keyDeserialization =
                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

        final DeserializationSchema<RowData> valueDeserialization =
                createDeserialization(context, valueDecodingFormat, valueProjection, null);

        final TypeInformation<RowData> producedTypeInfo =
                context.createTypeInformation(producedDataType);

        final FlinkKafkaConsumer<RowData> kafkaConsumer =
                createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);

        return SourceFunctionProvider.of(kafkaConsumer, false);
    }

The getScanRuntimeProvider method of this class will be called. All kafka-related operations can be traced back to the FlinkKafkaConsumer class (inherited from FlinkKafkaConsumerBase). The key methods of this class are as follows:

 @Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();

        this.unionOffsetStates =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                OFFSETS_STATE_NAME,
                                createStateSerializer(getRuntimeContext().getExecutionConfig())));

       ...
    }

   @Override
    public void open(Configuration configuration) throws Exception {
        // determine the commit offset mode
        this.offsetCommitMode =
                OffsetCommitModes.fromConfiguration(
                        getIsAutoCommitEnabled(),
                        enableCommitOnCheckpoints,
                        ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        // create the partition discoverer
        this.partitionDiscoverer =
                createPartitionDiscoverer(
                        topicsDescriptor,
                        getRuntimeContext().getIndexOfThisSubtask(),
                        getRuntimeContext().getNumberOfParallelSubtasks());
        this.partitionDiscoverer.open();

        subscribedPartitionsToStartOffsets = new HashMap<>();
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
        if (restoredState != null) {
            ...
        } else {
            // use the partition discoverer to fetch the initial seed partitions,
            // and set their initial offsets depending on the startup mode.
            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
            // determined
            // when the partition is actually read.
            switch (startupMode) {
                . . .
                default:
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        subscribedPartitionsToStartOffsets.put(
                                seedPartition, startupMode.getStateSentinel());
                    }
            }

            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (startupMode) {
                    ...
                    case GROUP_OFFSETS:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                }
            } else {
                LOG.info(
                        "Consumer subtask {} initially has no partitions to read from.",
                        getRuntimeContext().getIndexOfThisSubtask());
            }
        }

        this.deserializer.open(
                RuntimeContextInitializationContextAdapters.deserializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
    }

    @Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

        // initialize commit metrics and default offset callback method
        this.successfulCommits =
                this.getRuntimeContext()
                        .getMetricGroup()
                        .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
        this.failedCommits =
                this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();

        this.offsetCommitCallback =
                new KafkaCommitCallback() {
                    @Override
                    public void onSuccess() {
                        successfulCommits.inc();
                    }

                    @Override
                    public void onException(Throwable cause) {
                        LOG.warn(
                                String.format(
                                        "Consumer subtask %d failed async Kafka commit.",
                                        subtaskIndex),
                                cause);
                        failedCommits.inc();
                    }
                };

        // mark the subtask as temporarily idle if there are no initial seed partitions;
        // once this subtask discovers some partitions and starts collecting records, the subtask's
        // status will automatically be triggered back to be active.
        if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

        LOG.info(
                "Consumer subtask {} creating fetcher with offsets {}.",
                getRuntimeContext().getIndexOfThisSubtask(),
                subscribedPartitionsToStartOffsets);
        // from this point forward:
        // - 'snapshotState' will draw offsets from the fetcher,
        // instead of being built from `subscribedPartitionsToStartOffsets`
        // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
        // Kafka through the fetcher, if configured to do so)
        this.kafkaFetcher =
                createFetcher(
                        sourceContext,
                        subscribedPartitionsToStartOffsets,
                        watermarkStrategy,
                        (StreamingRuntimeContext) getRuntimeContext(),
                        offsetCommitMode,
                        getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                        useMetrics);

        if (!running) {
            return;
        }

        if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
            kafkaFetcher.runFetchLoop();
        } else {
            runWithPartitionDiscovery();
        }
    }

    @Override
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        ...
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry:
                        currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    kafkaTopicPartitionLongEntry.getKey(),
                                    kafkaTopicPartitionLongEntry.getValue()));
                }
          ...
        }
    }

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
            ...
            fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
            ...
    }

There are mainly four methods: initializeState, open, run, snapshotState, and notifyCheckpointComplete , let’s introduce them one by one with questions:
Note: For the sequence of the initializeState and open methods, you can refer to the StreamTask class, which has the following call chain:

invoke()
 ||
 \/
beforeInvoke()
 ||
 \/
operatorChain.initializeStateAndOpenOperators
 ||
 \/
FlinkKafkaConsumerBase.initializeState
 ||
 \/
FlinkKafkaConsumerBase.open

You can know that the initializeState method is called before open

initializeState method

What is done here is to restore kafkaTopicOffset information from the persistent State. We assume here that it is the first startup.

open method

  • offsetCommitMode
    offsetCommitMode = OffsetCommitModes.fromConfiguration Here you get the submission mode of the set kafka offset. The configuration of enable.auto.commit will be integrated here (the default is true ), enableCommitOnCheckpoints is true by default, checkpointing is set to true (default is false), and the value obtained based on the above is OffsetCommitMode.ON_CHECKPOINTS
  • partitionDiscoverer
    The main purpose here is to discover the partitions of kafka topics. The main process is partitionDiscoverer.discoverPartitions. The process involved here is as follows:
    AbstractPartitionDiscoverer.discoverPartitions
      ||
      \/
    AbstractPartitionDiscoverer.getAllPartitionsForTopics
      ||
      \/
    KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor
      ||
      \/
    KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //The defaultApiTimeoutMs here comes from *default.api.timeout.ms*
      ||
      \/
    Fetcher.getTopicMetadata //The last thing thrown here is new TimeoutException("Timeout expired while fetching topic metadata");
      ||
      \/
    Fetcher.sendMetadataRequest => NetworkClient.leastLoadedNode //The configured broker node will be selected according to a certain strategy.
      ||
      \/
    client.poll(future, timer) => NetworkClient.poll => selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // The *defaultRequestTimeoutMs* here comes from the configuration *request.timeout.ms*
    
    

    To sum up, what discoverPartitions does is to select the configured broker nodes according to a certain strategy and make requests to each node. After the request.timeout.ms times out, the The strategy selects the broker until the total time reaches the configured default.api.timeout.ms. Here the default default.api.timeout.ms is 60 seconds and request.timeout .ms is 30 seconds

  • subscribedPartitionsToStartOffsets
    According to the startupMode mode, the default is StartupMode.GROUP_OFFSETS (the default is to start consumption from the offset of the last consumption), set the enabled kafka offset, which will be used in kafkaFetcher

run method

  • Set some indicators successfulCommits/failedCommits
  • KafkaFetcher
    The main purpose here is to obtain data from kafka and loop into kafka’s topic partition discovery if there is a partition. The default configuration is 0 according to the configuration. scan.topic-partition-discovery.interval is set in practice. is 300000, which is 5 minutes. The main process is in the method runWithPartitionDiscovery:
     private void runWithPartitionDiscovery() throws Exception {
          final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
          createAndStartDiscoveryLoop(discoveryLoopErrorRef);
    
          kafkaFetcher.runFetchLoop();
    
          // make sure that the partition discoverer is woken up so that
          // the discoveryLoopThread exits
          partitionDiscoverer.wakeup();
          joinDiscoveryLoopThread();
    
          //rethrow any fetcher errors
          final Exception discoveryLoopError = discoveryLoopErrorRef.get();
          if (discoveryLoopError != null) {
              throw new RuntimeException(discoveryLoopError);
          }
      }
    
    
    • createAndStartDiscoveryLoop This will start a single thread in a while sleep mode to poll Kafka’s partition discovery at scan.topic-partition-discovery.interval intervals. Note that this will swallow Execption, and will not throw an exception

       private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
         discoveryLoopThread =
                 new Thread(
                         ...
                         while (running) {
                           ...
                                     try {
                                         discoveredPartitions =
                                                 partitionDiscoverer.discoverPartitions();
                                     } catch (AbstractPartitionDiscoverer.WakeupException
                                             | AbstractPartitionDiscoverer.ClosedException e) {
                                       
                                         break;
                                     }
                                     if (running & amp; & amp; !discoveredPartitions.isEmpty()) {
                                         kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                                     }
      
                                     if (running & amp; & amp; discoveryIntervalMillis != 0) {
                                         try {
                                             Thread.sleep(discoveryIntervalMillis);
                                         } catch (InterruptedException iex) {
                                             break;
                                         }
                                     }
                                 }
                             } catch (Exception e) {
                                 discoveryLoopErrorRef.set(e);
                             } finally {
                                 // calling cancel will also let the fetcher loop escape
                                 // (if not running, cancel() was already called)
                                 if (running) {
                                     cancel();
                                 }
                             }
                         },
                         "Kafka Partition Discovery for "
                                  + getRuntimeContext().getTaskNameWithSubtasks());
      
         discoveryLoopThread.start();
      }
      

      The kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);subscribedPartitionStates variable here will save the discovered partition information, which will be set in kafkaFetcher.runFetchLoop The submitted offset information will be used in snapshotState

    • kafkaFetcher.runFetchLoop will pull data from kafka and set kafka’s offset. The specific process is as follows:

       runFetchLoop
          ||
          \/
        subscribedPartitionStates Here you will get the *subscribedPartitionStates* variable
          ||
          \/
        partitionConsumerRecordsHandler
          ||
          \/
        emitRecordsWithTimestamps
          ||
          \/
        emitRecordsWithTimestamps
          ||
          \/
        partitionState.setOffset(offset);
      

      The offset here is obtained from the consumed kafka record.

snapshotState method

The information in subscribedPartitionStates will be processed here, mainly adding it to the pendingOffsetsToCommit variable

  • offsetCommitMode
    The above mentioned is OffsetCommitMode.ON_CHECKPOINTS. If it is ON_CHECKPOINTS, subscribedPartitionStates will be obtained from fetcher.snapshotCurrentState
    And add it to pendingOffsetsToCommit, and persist it in unionOffsetStates. The actual kafka offset commit operation is in notifyCheckpointComplete.

notifyCheckpointComplete method

Obtain the kafka offset information to be submitted and persist it in kafka

Others

The request.timeout.ms mentioned before is not a request, but from the current implementation, the kafka client will respond to the broker that fails to access it. >, access until timeout (default 30 seconds):

[2023-10-13 13:42:18] 7673 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 ( id: -11 rack: null)
[2023-10-13 13:42:18] 7723 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 7773 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 7823 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 7874 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 7924 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 7974 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 8025 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 8075 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:18] 8125 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8175 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8226 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8276 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8326 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8376 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8426 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8477 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8527 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8577 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8627 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)
[2023-10-13 13:42:19] 8678 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: - 11 rack: null)

Reference

  • Initialization order of open and initailizeState
  • A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata
syntaxbug.com © 2021 All Rights Reserved.