Kafka – 3.x offset does not exactly point to north

Article directory

  • The default maintenance position of offset
    • Consumer `__consumer_offsets` case
  • Automatically submit offset
    • Code
  • Submit offset manually
    • Code synchronous submission
    • Code asynchronous submission
  • Specify offset consumption (auto.offset.reset = earliest | latest | none |)
  • Data leakage consumption and repeated consumption analysis


Default maintenance location of offset

Since the consumer may experience power outages and other failures during the consumption process, after the consumer recovers, it needs to continue consuming from the position before the failure. Therefore, the consumer needs to record in real time which offset it consumes in order to prevent failures. Continue consumption after recovery.

Before Kafka version 0.9, the consumer saves offsets in Zookeeper by default. Starting from version 0.9, the consumer saves offsets in a built-in topic of Kafka by default, which is __consumer_offsets.

The key + value method is used to store data in the __consumer_offsets topic.

  • The key is groupId + topic + partition number
  • value is the value of the current offset.

Every period of time, Kafka will internally compact this topic, that is, each groupId + topic + partition number will retain the latest data.

Consumption__consumer_offsets case

  1. __consumer_offsets is a topic in kafka, which can be consumed through consumers

  2. Add the configuration exclude.internal.topics=false in the configuration file config/consumer.properties. The default is true, which means that system topics cannot be consumed. In order to view the system theme data, we need to change the parameter to false.

  3. Create a new topic on the command line

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.126.171:9092 --create --topic testArtisan --partitions 2
Created topic testArtisan.

  1. Start the producer to produce data in the topic testArtisan

  2. Start the consumer to consume data in the topic testArtisan


Note: Specifying the name of the consumer group can better observe the data storage location (key->groupId + toipc + partition number).

  1. Start the consumer consumption topic __consumer_offsets
[root@localhost bin]# ./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 192.168.126.171:9092 --consumer.config ../config/consumer.properties --formatter \ "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

Automatically submit offset

Kafka’s automatic offset mechanism is a way to manage the consumer’s offset when consuming messages. The main feature of this mechanism is to automatically submit the offset of successfully consumed messages to Kafka without requiring the consumer to explicitly track and submit the offset. Here’s a brief overview of how it works:

  1. Consumer subscription to Topic: The consumer subscribes to one or more Kafka Topics at startup to start consuming messages.

  2. Message consumption: The consumer pulls messages from the subscribed Topic and processes them. Once a message is successfully processed, the consumer automatically records the offset of the message.

  3. Automatically submit offset: Depending on the configuration, the consumer can automatically submit the offset of successfully consumed messages to the Kafka cluster on a regular basis. This means that consumers do not need to manually track the offset of each partition, Kafka will perform this task for them.

  4. Configuration parameters: Consumers can control the way of automatically submitting offsets by configuring the following two parameters:

    • enable.auto.commit: Specifies whether to enable automatic offset submission, the default is true.
    • auto.commit.interval.ms: Specifies the time interval for automatically submitting offsets. The default is 5 seconds.
  5. Note: The mechanism for automatically submitting offsets is convenient, but you also need to pay attention to the following points:

    • If automatic submission is turned on, the offset will be automatically submitted in the background when the consumer processes the message. This may cause messages to be reprocessed on failure, so the consumer needs to handle message processing failures.
    • The time interval for automatic submission needs to be configured according to specific needs to take into account the real-time nature of data processing and the frequency of offset submission.

The automatic offset submission mechanism simplifies consumer code and reduces maintenance complexity. However, in some cases, care needs to be taken to ensure idempotence of message processing to prevent repeated processing of already submitted messages. If more precise offset control is required, or custom logic needs to be executed when message processing fails, consumers can also choose to disable automatic submission and manually manage offsets.

Code

package com.artisan.pc;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author little craftsman
 * @version 1.0
 * @mark: show me the code, change the world
 */
public class CustomConsumer2 {<!-- -->

    public static void main(String[] args) {<!-- -->
        // 1. Create the consumer configuration object
        Properties properties = new Properties();

        // 2. Add parameters to the consumer configuration object
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
        // Configuration serialization required
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //Configure consumer group required
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "artisan-group");

        // Whether to automatically submit offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // The time period for submitting offset, the default is 5s,
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");


        // 3. Create consumer object
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 4. Subscribe to the topic
        consumer.subscribe(Arrays.asList("artisan"));

        // 5. Pull data and print
        while (true) {<!-- -->
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 6. Traverse and output the consumed data
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {<!-- -->
                System.out.println(consumerRecord);
            }
        }
    }
}
    

Manually submit offset

Kafka allows consumers to manage offsets in two ways, that is, consumers can choose to automatically submit offsets or manually submit offsets. In the mechanism of manually submitting offsets, consumers have more control and flexibility and can submit offsets after ensuring that the message is processed. The following is a brief description of manually submitting offsets:

  1. The concept of Offset: In Kafka, each consumer has a current offset, which represents the position it has read in the partition. Offset is a flag used to track the consumer’s read position in each partition.

  2. Manual submission of offset: Manual submission of offset means that the consumer is responsible for informing Kafka Broker that it has successfully processed a batch of messages and submitted the offset. This mechanism allows consumers to control the timing of offset submission in a more fine-grained manner.

  3. When to submit offset: The consumer can manually submit the offset after processing the message, usually under the following circumstances:

    • After the message is successfully processed, it is confirmed that the message has been consumed.
    • Periodically, to ensure that even if the consumer fails, it does not reprocess the same message.
  4. Methods to submit offsets: Kafka provides two main methods for manually submitting offsets:

    • commitSync(): This is a method to submit offset synchronously. The consumer will wait until the offset is submitted successfully before continuing to process the message.
    • commitAsync(): This is a method to submit offset asynchronously. The consumer will submit the offset but will not wait for confirmation.
  5. Notes on manual submission:

    • You need to be careful when submitting offsets manually, because if the offset is submitted incorrectly, messages may be consumed repeatedly or lost.
    • Consumers need to ensure the atomicity of offset submission to avoid submission failure.
    • If the consumer processes the message but fails before committing the offset, some recovery mechanism may need to be implemented to avoid data loss or duplicate processing.

The mechanism of manually submitting offsets gives consumers more control, allowing them to adapt to different processing needs. However, this also adds some complexity, and offset submission needs to be handled carefully to ensure data consistency and reliability. Automatic offset submission is easier to implement than manual offset submission, but may not be suitable for situations that require finer-grained control.

Code synchronous submission

Since synchronous offset submission has a failure retry mechanism, it is more reliable. The following is an example of synchronous offset submission.

package com.artisan.pc;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author little craftsman
 * @version 1.0
 * @mark: show me the code, change the world
 */
public class CustomConsumerByHand {<!-- -->

    public static void main(String[] args) {<!-- -->
        // 1. Create kafka consumer configuration class
        Properties properties = new Properties();
        // 2. Add configuration parameters
        //Add connection
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
        // Configuration serialization required
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //Configure consumer group
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // Whether to automatically submit offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // Submit offset time period
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // 3. Create kafka consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 4. Set the consumption topic. The formal parameter is a list.
        consumer.subscribe(Arrays.asList("artisan"));

        // 5. Consumption data
        while (true) {<!-- -->
            // 6. Read the message
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            // 7. Output message
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {<!-- -->
                System.out.println(consumerRecord.value());
            }
            // Submit offset synchronously
            consumer.commitSync();
        }

    }
}
    

Code asynchronous submission

Although submitting offset synchronously is more reliable, it will block the current thread until the submission is successful. Therefore throughput will be greatly affected. Therefore, in more cases, asynchronous submission of offsets will be used.

package com.artisan.pc;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @author little craftsman
 * @version 1.0
 * @mark: show me the code, change the world
 */
public class CustomConsumerByHandAsync {<!-- -->

    public static void main(String[] args) {<!-- -->
        // 1. Create kafka consumer configuration class
        Properties properties = new Properties();
        // 2. Add configuration parameters
        //Add connection
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
        // Configuration serialization required
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //Configure consumer group
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // Whether to automatically submit offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // Submit offset time period
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // 3. Create kafka consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 4. Set the consumption topic. The formal parameter is a list.
        consumer.subscribe(Arrays.asList("artisan"));

        // 5. Consumption data
        while (true) {<!-- -->
            // 6. Read the message
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            // 7. Output message
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {<!-- -->
                System.out.println(consumerRecord.value());
            }
            // Submit offset asynchronously
            consumer.commitAsync(new OffsetCommitCallback() {<!-- -->
                /**
                 * Callback function output
                 * @param offsets offset information
                 * @param exception exception
                 */
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {<!-- -->
                    // Print if exception occurs
                    if (exception != null) {<!-- -->
                        System.err.println("Commit failed for " + offsets);
                    }
                }
            });
        }

    }
}
    

Specify offset consumption (auto.offset.reset = earliest | latest | none |)

auto.offset.reset = earliest | latest | none |

What to do when there is no initial offset in Kafka (consumer group consumes for the first time) or the current offset no longer exists on the server (e.g. the data has been deleted)?
(1)earliest: Automatically reset the offset to the earliest offset
(2) latest (default value): automatically reset the offset to the latest offset
(3) none: If the previous offset of the consumer group is not found, an exception is thrown to the consumer

Data leakage consumption and repeated consumption analysis

  1. Problem: Whether submitting offset synchronously or asynchronously, it may cause leakage or repeated consumption of data.
  2. Missing consumption: Submit offset first and then consume, which may cause missing consumption of data;
  3. Repeated consumption: If you consume first and then submit the offset, it may cause repeated consumption of data.

Thinking: How can we neither miss consumption nor repeat consumption?