Summarize Kafka manual submission and automatic submission

Automatic submission

After the program pulls the message, it will submit it automatically after meeting the requirements, without the intervention of the program developer.

  1. Configuration

@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //concurrent number
        factory. setConcurrency(concurrency);
        //Batch fetch
        factory.setBatchListener(true);
        // do not start automatically
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //rebalance monitoring
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;

    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //Set up autocommit
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //auto commit time interval
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //Reset displacement Start consumption from the latest or oldest message
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }

2. Scenarios of repeated consumption and lost messages

2.1 Lost message

After the consumer pulls the message, the offset (such as 1-10) has been automatically submitted after the submission time AUTO_COMMIT_INTERBAL_MS_CONFIG is reached. If the message has not been consumed yet and the consumer hangs up, the message that has not been consumed at this time will be lost. Because the offset 1-10 has been submitted, the next time the message is pulled, it will be consumed from offset 10. Note: The submitted displacement is the starting point for the next consumption.

2.2 Repeated consumption

This situation often occurs when the AUTO_COMMIT_INTERBAL_MS_CONFIG time configuration is too long. After the consumer pulls the message, the consumer has completed the consumption, but it has not reached AUTO_COMMIT_INTERBAL_MS_CONFIG at this time, the displacement has not been submitted, but the consumer hangs up. Next time, this batch of messages will be re-pulled and re-processed, resulting in repeated consumption.

Manual submission

Submit manually, and the program designer controls the time to submit the displacement by himself. It can be automatically submitted by spring or actively invoked to submit the displacement method.

1. Configuration

@Bean("kafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //concurrent number
        factory. setConcurrency(concurrency);
        //Batch fetch
        factory.setBatchListener(true);
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        //Manual submission method Submit immediately after manually calling the api interface
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        //Manual synchronous submission
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setConsumerRebalanceListener(new RebalanceListener());
        return factory;

    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // set autocommit to false
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }

2. Manual submission method and parameter introduction

2.1 Synchronous manual submission

Synchronous blocking, continue to execute after the submission function returns; automatic retry on failure;

2.2 Asynchronous manual submission

Asynchronous submission does not need to wait for the return of the submission method; there is no failure retry mechanism.

Reason: The displacement is submitted asynchronously. The submission of displacement 2 may fail, and the submission of displacement 3 is successful. If displacement 2 tries to resubmit and succeeds, the displacement will be updated from 3 to 2, which will cause repeated consumption by re-pulling message 3 next time.

2.3 Manually submit parameters

spring kafka:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, set autocommit to false;

  1. factory.getContainerProperties().setSyncCommits(true); Set manual synchronous submission, or asynchronous submission;

  1. factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

ackMode parameter:

* RECORD: Submit the offset immediately after the message processor returns the processing result.
* BATCH: Submit offset when all messages returned by the poll() method have been processed. (Defaults)
* TIME: When all the messages returned by the poll() method have been processed and the time since the last submission of the offset has exceeded the time set by ackTime, the offset is submitted.
* COUNT: Commit offset when all messages returned by the poll() method have been processed and ackCount records have been received since the last offset commit.
* COUNT_TIME: Similar to TIME and COUNT, but if any of the two conditions are satisfied, then submit the offset.
* MANUAL: The message listener is responsible for manually submitting the Acknowledgment object. Thereafter, the same semantics as BATCH will be applied.
* MANUAL_IMMEDIATE: Submit offset immediately when the message listener calls the Acknowledgment.acknowledge() method;

3. Scenario of repeated consumption

3.1 Repeated consumption

After the consumer consumes message 1-message 10, before calling the method of submitting displacement, the consumer hangs up. After rebalance, this partition is assigned to a new consumer, which will repeatedly pull message 1-message 10 for consumption, resulting in the problem of repeated consumption.