The consumer submits the consumed offset

1. Overview

After consuming the message, the consumer will submit the consumed offset to the built-in Topic of __consumer_offsets-; each consumer group maintains an offset for the current consumer group. So here comes the question: When does the consumer group update the offset to the partition in the broker?

Kafka consumer configuration information

Name Description default
enable.auto.commit If true , the consumer’s offset will be submitted periodically in the background true
auto. commit.interval.ms If enable.auto.commit is set to true, how often (in milliseconds) consumer offsets are automatically committed to Kafka 5000

2. Automatically commit offsets

After automatic submission is turned on on the consumer side, it will be automatically submitted every auto.commit.interval.ms;

public static void main(String[] args) {<!-- -->
    //Create kafka consumer configuration object and configuration information
    Properties props = new Properties();
    props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    props.put("group.id", "hy-local-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "5000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //Create kafka consumer object
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    //Consume messages
    kafkaConsumer.subscribe(Arrays.asList("hy1-test-topic"));
    while (true) {<!-- -->
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
        for (ConsumerRecord<String, String> record : records) {<!-- -->
            System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value ());
        }
    }
}

/*
output:
------offset-- = 5, key = null, value = NBA
------offset-- = 4, key = null, value = CBA
------offset-- = 5, key = null, value = CUBA
------offset-- = 6, key = null, value = NCAA
------offset-- = 6, key = null, value = ABA
------offset-- = 5, key = null, value = NBL
*/

What will happen if the Consumer successfully consumes the message but the service hangs before submitting the offset?

Answer:Repeat consumption

3. Manually commit offsets

Although automatic offset submission is very simple and convenient, because it is submitted based on time, it is difficult for developers to grasp the timing of offset submission. Therefore, Kafka also provides an API for manually submitting offsets.

There are two ways to manually submit offsets: commitSync (synchronous submission) and commitAsync (asynchronous submission)

  • Similarity: Submit the highest offset of a batch of data in this poll
  • Differences: commitSync blocks the current thread until the submission is successful, and will automatically fail and retry (submission failure may also occur due to uncontrollable factors); while commitAsync There is no failure retry mechanism, so the submission may fail.

3.1 Synchronous commit offset

public static void main(String[] args) {<!-- -->
    //Create kafka consumer configuration object and configuration information
    Properties props = new Properties();
    props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    props.put("group.id", "hy-local-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "5000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //Create kafka consumer object
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    //Subscribe to messages
    kafkaConsumer.subscribe(Arrays.asList("hy2-test-topic"));
    while (true) {<!-- -->
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> record : records) {<!-- -->
            System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value ())
        }
        //Synchronous submission, the current thread will block until the offset is submitted successfully
        kafkaConsumer.commitSync();
    }
}

3.2 Asynchronously commit offset

Although synchronously submitting offset is more reliable, it will block the current thread until the submission is successful. Therefore, throughput will be greatly affected. Therefore, in more cases, asynchronous submission of offset will be used.

public static void main(String[] args) {<!-- -->
    //Create kafka consumer configuration object and configuration information
    Properties props = new Properties();
    props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    props.put("group.id", "hy-local-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "5000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //Create kafka consumer object
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    //Subscribe to messages
    kafkaConsumer.subscribe(Arrays.asList("hy2-test-topic"));
    while (true) {<!-- -->
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> record : records) {<!-- -->
            System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value ());
        }
        //Asynchronous submission
        kafkaConsumer.commitAsync(new OffsetCommitCallback() {<!-- -->
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {<!-- -->
                if (exception != null) {<!-- -->
                    System.err.println("Exception....");
                }
            }
        });
    }
}

Whether it is synchronous submission or asynchronous submission of offset, it may cause leakage or repeated consumption of data. Submitting offset first and then consuming may result in missed data consumption; consuming first and then submitting offset may result in repeated consumption of data.