KafkaConsumer consumption logic

Version: kafka-clients-2.0.1.jar

Previously, I wanted to write a plug-in to modify the logic of the kafkaConsumer consumer and filter some messages based on headers. Therefore, we need to understand how kafkaConsumer pulls out consumption messages, and confirm whether filtering out messages before consumption will have any impact.
The following is the relevant source code, explained through comments.

First conclusion: the offset of the message pulled by kafkaConsumer is stored locally, and the message is pulled according to the offset. When automatic submission is enabled, the offset will be automatically submitted to the broker (in some scenarios, it will be manually checked whether submission is required) to prevent the offset from being lost during restart or reblance. The locally saved offset is updated when the message is fetched locally, so in the automatic submission scenario, filtering out the message before consumption has no effect.

Pull messages

KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {<!-- -->
    // note: Obtain a light lock while checking the non-multi-threaded environment, and check the consumer open status (can be closed)
    acquireAndEnsureOpen();
    try {<!-- -->
        if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");

        // note: subscriptions:SubscriptionState maintains the status information (group, offset, etc.) of the topic list subscribed by the current consumer
        // Method to determine whether the partition is unsubscribed or unallocated
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {<!-- -->
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        // poll for new data until the timeout expires
        long elapsedTime = 0L;
        do {<!-- -->
            // note: Whether the wake-up operation is triggered (the wakeup method of the current object is called). Exit the current method by throwing an exception (this is a while loop, which may have been pulling messages (when there is no new message))
            client.maybeTriggerWakeup();

            final long metadataEnd;
            if (includeMetadataInTimeout) {<!-- -->
                final long metadataStart = time.milliseconds();
                // note: Update partition allocation metadata and offset, remain is used to calculate the remaining time
                // Internal logic:
                // 1 Coordinator ConsumerCoordinator.poll pulls coordinator events (heartbeats and automatic submissions will be sent during this period)
                // 2 updateFetchPositions updates positions, (but if there is local positions data, it will not be updated. After updating pos, if there are still missing ones, use the reset strategy first, and finally set pos asynchronously)
                if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {<!-- -->
                    return ConsumerRecords.empty();
                }
                metadataEnd = time.milliseconds();
                elapsedTime + = metadataEnd - metadataStart;
            } else {<!-- -->
                while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {<!-- -->
                    log.warn("Still waiting for metadata");
                }
                metadataEnd = time.milliseconds();
            }
            
            //note: We finally started to pull messages here. Let’s talk about it separately.
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));

            if (!records.isEmpty()) {<!-- -->
                //note: Translation: Before returning, send the next pull request to avoid blocking the response.

                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {<!-- -->
                    client.pollNoWakeup();
                }

                //note: Use an interceptor to intercept the message here. You can modify or filter the message here, but you need to pay attention to the commit issue.
                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
            final long fetchEnd = time.milliseconds();
            elapsedTime + = fetchEnd - metadataEnd;

        } while (elapsedTime < timeoutMs);

        return ConsumerRecords.empty();
    } finally {<!-- -->
        release();
    }
}

About the logic of pollForFetches

pollForFetches

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {<!-- -->
    final long startMs = time.milliseconds();
    long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

    // note: First get the message that has been pulled, and return it directly if it exists
    // There is a completedFetches inside the fetcher to temporarily store the pre-fetch request, which can parse out nextLineRecords to temporarily store the pre-fetch message.
    // When getting the message from nextLineRecords, first determine the status (such as assigned, paused, position),
    // Then after getting the message, update the position in subscriptions (the value is the next offset). Note that it has not been committed at this time.
    
    // if data is available already, return it immediately
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {<!-- -->
        return records;
    }

    // note: There is no pre-pull message, and a pull request is sent (not actually sent)
    // First find the leader of the partition, check if it is available, check if there are no pending requests, then get the position from subscriptions, and build the ClientRequest temporary store.
    // And set the listener (if successful, the processing result will be queued completedFetches)
    
    // send any new fetches (won't resend pending fetches)
    fetcher.sendFetches();

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure

    // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
    // updateAssignmentMetadataIfNeeded before this method.
    if (!cachedSubscriptionHashAllFetchPositions & amp; & amp; pollTimeout > retryBackoffMs) {<!-- -->
        pollTimeout = retryBackoffMs;
    }

    // note: Polling and waiting, see below for details

    client.poll(pollTimeout, startMs, () -> {<!-- -->
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasCompletedFetches();
    });

    // after the long poll, we should check whether the group needs to rebalance
    // prior to returning data so that the group can stabilize faster
    if (coordinator.rejoinNeededOrPending()) {<!-- -->
        return Collections.emptyMap();
    }

    return fetcher.fetchedRecords();
}

ConsumerNetworkClient#poll

/**
 * Poll for any network IO.
 * @param timeout timeout in milliseconds
 * @param now current time in milliseconds
 * @param disableWakeup If TRUE disable triggering wake-ups
 */
public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {<!-- -->
    
    // note: trigger the callback handler for completed requests (there is a pendingCompletion queue)
    // there may be handlers which need to be invoked if we woke up the previous call to poll
    firePendingCompletedRequests();

    lock.lock();
    try {<!-- -->
        // note: Handle disconnected connections (pendingDisconnects queue)
        // Handle async disconnects prior to attempting any sends
        handlePendingDisconnects();

        // note: This is where the actual request is actually made. . The previous feature just builds the request
        // The previously prepared ClientRequest is placed in an UnsentRequests (internal map, key: Node, val: requests)
        // Take it out and send it, kafkaClient.ready -> send
        // send all the requests we can send now
        long pollDelayMs = trySend(now);
        timeout = Math.min(timeout, pollDelayMs);

        // note: The main purpose here is to determine whether poll needs to be blocked (whether timeout is 0). If there is nothing to be completed and it is judged that it should be blocked (completedFetches is empty), block
        // Poll reads and writes data from sockets
        
        // check whether the poll is still needed by the caller. Note that if the expected completion
        // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
        // handler), the client will be woken up.
        if (pendingCompletion.isEmpty() & amp; & amp; (pollCondition == null || pollCondition.shouldBlock())) {<!-- -->
            // if there are no requests in flight, do not block longer than the retry backoff
            if (client.inFlightRequestCount() == 0)
                timeout = Math.min(timeout, retryBackoffMs);
            client.poll(Math.min(maxPollTimeoutMs, timeout), now);
            now = time.milliseconds();
        } else {<!-- -->
            client.poll(0, now);
        }

        // note: Check the disconnected link to determine whether the node connection is disconnected. If so, take the corresponding requests from unset, build the response and add it to completedFetches.
        
        // handle any disconnects by failing the active requests. note that disconnects must
        // be checked immediately following poll since any subsequent call to client.ready()
        // will reset the disconnect status
        checkDisconnects(now);
        if (!disableWakeup) {<!-- -->
            // trigger wakeups after checking for disconnects so that the callbacks will be ready
            // to be fired on the next call to poll()
            maybeTriggerWakeup();
        }
        // throw InterruptException if this thread is interrupted
        maybeThrowInterruptException();

        // note: Send another request. It is speculated that some node connections may not be ready for the first time (if not ready, they will be initialized and return false)
        // try again to send requests since buffer space may have been
        // cleared or a connect finished in the poll
        trySend(now);

        // fail requests that couldn't be sent if they have expired
        failExpiredRequests(now);

        // clean unsent requests collection to keep the map from growing indefinitely
        unsent.clean();
    } finally {<!-- -->
        lock.unlock();
    }

    // called without the lock to avoid deadlock potential if handlers need to acquire locks
    firePendingCompletedRequests();
}

Automatic submission

Submitting the offset is to prevent the local position from being lost and subsequent messages unable to be fetched normally after a restart or rebalance.

The entrance is ConsumerCoordinator#maybeAutoCommitOffsetsAsync

The trigger logic is mainly

  • KafkaConsumer#poll Pull messages
  • -> KafkaConsumer#updateAssignmentMetadataIfNeeded
  • -> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (The request is also built first and stored in unset, and then sent out when the message is pulled)
 public void maybeAutoCommitOffsetsAsync(long now) {<!-- -->
        // This is used to determine whether the automatic submission interval is met.
        if (autoCommitEnabled & amp; & amp; now >= nextAutoCommitDeadline) {<!-- -->
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }