Kafka-Java 4: Spring configures the strategy for Kafka consumers to submit Offset

1. Strategy for Kafka consumer to submit Offset

The strategies for Kafka consumers to submit Offset are

  1. Automatically submit Offset:
    1. After the consumer pulls the message and before it is consumed by the consumer, the offset is automatically submitted directly.
    2. Automatic submission may lose data. For example, the offset of the message has been submitted before it is consumed by the consumer. After the message is pulled, the consumer may hang up.
  2. Submit Offset manually
    1. When/after the consumer consumes the message, it submits the offset and implements it in the consumer.
    2. Manual submission of Offset is divided into: manual synchronous submission (commitSync), manual asynchronous submission (commitAsync)
  3. What is Offset
    1. Reference article: Linux: [Kafka III] Component Introduction

2. Automatic submission strategy

By default, Kafka consumers automatically submit the Offset strategy.

You can set the time interval for automatic submission

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka consumer consumes messages and automatically submits offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerAutoSubmitOffset {

    private final static String CONSUMER_GROUP_NAME = "GROUP1";
    private final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        // 1. Set parameters
        //Configure kafka address
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
// "192.168.151.28:9092"); // Stand-alone configuration
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // Cluster configuration
        // Configure message key value serialization rules
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //Configure consumer group
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        //Set the consumer offset submission method
        // Automatic submission: default configuration
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //The time interval for automatically submitting offsets
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        // 2. Create consumers
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        // 3. Consumer subscription topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 4. Pull the message and start consuming it
        while (true){
            // Pull messages from kafka cluster
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //Consuming messages is currently in automatic submission mode. After the previous line of the message is pulled down, the offset is automatically submitted. If the following code makes an error, or at this time
            // If the consumer hangs up, then the consumption is not actually consumed (that is, business logic processing)
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Message received: partition: " + record.partition() + ", offset: " + record.offset()
                 + ", key value: " + record.key() + ", value value: " + record.value());
            }
        }
    }
}

The following code in the above code is the relevant settings of the automatic submission strategy

 //Set the consumer offset submission method
        // Automatic submission: default configuration
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //The time interval for automatically submitting offsets
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

3. Manual submission strategy

3.1. Manual synchronization of submission strategies

Manual synchronous submission will block at the submission offset. When the consumer receives the ack returned by the Kafka cluster indicating that the consumer has successfully submitted the offset, it starts executing the subsequent code in the consumer.

Because it is easy to lose messages when using asynchronous submission, generally use synchronous submission, and do not do other logical processing after synchronous submission.

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka consumer consumes messages and manually submits offset synchronously
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset {

    private final static String CONSUMER_GROUP_NAME = "GROUP1";
    private final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        //Key code: turn off automatic submission
        // Manually submit offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // The time interval for automatic offset submission: this value no longer needs to be set at this time
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
       
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Message received: partition: " + record.partition() + ", offset: " + record.offset()
                 + ", key value: " + record.key() + ", value value: " + record.value());
            }

            //Key code: commitSync(): synchronous submission method
            // Submit in synchronous mode, blocking will occur at this time. When the kafka cluster returns an ack of successful submission, the blocking will be eliminated and subsequent code logic will be carried out.
            // Generally, synchronous submission is used, and no other logical processing is performed after synchronous submission.
            consumer.commitSync();

            //do anything
        }
    }
}

3.2. Manual asynchronous submission strategy

Asynchronous submission will not block when the offset code is submitted. That is, after the consumer submits the offset, it can continue to execute subsequent codes without waiting for the ack returned by the Kafka cluster. However, when submitting the offset, a callback method needs to be provided for the Kafka cluster to call back to tell the consumer the result of submitting the offset.

package com.demo.lxb.kafka;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @Description: kafka consumer consumes messages and submits offset manually asynchronously
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset2 {

    private final static String CONSUMER_GROUP_NAME = "GROUP1";
    private final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        //Key code: turn off automatic submission
        // Manually submit offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // The time interval for automatic offset submission: this value no longer needs to be set at this time
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
       
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
         
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Message received: partition: " + record.partition() + ", offset: " + record.offset()
                 + ", key value: " + record.key() + ", value value: " + record.value());
            }
            //Key code: commitAsync() asynchronous submission
            // new OffsetCommitCallback is a callback method used by the kafka cluster to tell the consumer to submit the offset result
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e != null){
                        // Can record submission failure messages to the log
                        System.out.println("Record the message of failure to submit offset to the log");
                        System.out.println("Consumer submitted offset and threw an exception:" + Arrays.toString(e.getStackTrace()));
                        System.out.println("Message information of consumer submitting offset exception:" + JSONObject.toJSONString(map));
                    }
                }
            });

            // Subsequent logical processing does not need to wait until the kafka cluster returns an ack of successful submission before starting processing.
            //do anything

        }
    }
}