Kafka – 3.x Kafka consumer incomplete guide

Article directory

  • Kafka consumption model
  • Kakfa consumer workflow
    • Overall consumer workflow
    • consumer group principle
    • Consumer group initialization process
    • Detailed consumption process of consumer group
  • Independent consumer case (subscription topic)
  • Important parameters for consumers


Kafka consumption model

Kafka’s consumer uses pull mode to read data from the broker.

Mode Advantages Disadvantages
Push mode – Deliver messages quickly
– The message sending rate is determined by the broker
– It is difficult to adapt to the consumption rates of different consumers
– May cause denial of service and network congestion
Pull (pull) mode – Can consume messages at an appropriate rate based on the consumer’s consumption ability – Potential loop problem, if Kafka has no data, the consumer may always return empty data
– It is necessary to set the polling timeout to avoid infinite waiting time

Kakfa consumer workflow

Overall consumer workflow

The overall workflow of a Kafka consumer includes the following steps:

  1. Configure consumer attributes: First, you need to configure the consumer attributes, including the address of the Kafka cluster, consumer group, topic name, serializer/deserializer, automatic offset submission, etc.

  2. Create consumer instance: Use configuration to create a Kafka consumer instance.

  3. Subscribe to topic: Use a consumer instance to subscribe to one or more Kafka topics. This tells the Kafka consumer which topics you want to receive messages from.

  4. Polling data: The consumer uses the poll() method to pull messages from the Kafka broker. It periodically polls (pulls) the Kafka cluster for new messages.

  5. Processing messages: Once the message is obtained from the Kafka broker, the consumer will process the message and execute your business logic. This may include data processing, computing, storage or other operations.

  6. Commit offset: Consumers can choose to commit offsets manually or automatically to record the position of processed messages. This helps prevent duplicate processing of messages.

  7. Handling exceptions: Exceptions may occur during message processing, and you need to handle these exceptions, such as retrying or logging errors.

  8. Close consumer: When a consumer instance is no longer needed, be sure to close it to release resources.


This workflow covers the main steps of Kafka consumers from configuration to data processing to resource management. Consumers are typically multi-threaded or multi-process to handle large volumes of messages and be able to adjust the consumption rate as needed. In addition, Kafka’s consumer library provides many functions, such as automatic load balancing, automatic offset management, etc., to simplify consumer development and maintenance.

Consumer Group Principle

Kafka Consumer Group is a mechanism used to coordinate and manage multiple consumers and jointly consume messages from one or more Kafka topics. Here’s how consumer groups work:

  1. Multiple consumers: A consumer group can contain multiple consumer instances that work together to jointly consume messages from one or more topics.

  2. Subscribe to topic: All consumer instances subscribe to the same Kafka topic. This means that each message will be processed by an instance in the consumer group, thus achieving load balancing of messages.

  3. Message Partition: Each Kafka topic is usually divided into multiple partitions, each partition containing a subset of messages. Each consumer instance is responsible for consuming messages from one or more partitions.

  4. Coordinator: The consumer instances in the consumer group will select a coordinator (Coordinator) to manage the consumers in the group. The coordinator is usually a special topic within ZooKeeper or Kafka itself.

  5. Offset management: The coordinator is responsible for managing the offset of the consumer group, which is the current position of the consumer in the topic partition. It tracks the consumption progress of each partition to ensure that messages are not consumed repeatedly.

  6. Assign partitions: The coordinator will periodically redistribute partitions to consumer instances to ensure load balancing and failure recovery. If a new consumer joins the group or a consumer leaves the group, the coordinator redistributes the partitions.

  7. Consuming messages: Each consumer instance is responsible for processing messages in the partition assigned to it. It pulls the message, processes it, and submits the offset to the coordinator.

  8. Automatic rebalancing: If a consumer instance joins or leaves the consumer group, or the distribution of partitions changes, the consumer group will automatically rebalance to ensure even distribution of messages.

  9. Commit offsets: Consumer instances can commit offsets for processed messages periodically or on demand to restore consumption progress in the event of a failure.

In this way, Kafka consumer groups are able to achieve high availability, load balancing, and fault tolerance, allowing multiple consumers to process messages in parallel and dynamically adjust partition allocation based on demand. This makes consumer groups ideal for handling large-scale streaming data.

Consumer group initialization process



Consumer group detailed consumption process

The initialization process of a Kafka consumer group consists of a series of steps for creating and configuring the members of the consumer group. The following is the initialization process of Kafka consumer group:

  1. Introduce the Kafka client library: First, make sure that the Kafka client library is introduced in your application so that you can use Kafka-related classes and functions.

  2. Create consumer configuration: Before initializing the consumer group, you need to create a consumer configuration object, which includes some important attributes, such as the address of the Kafka cluster, the ID of the consumer group, and the automatic commit offset. Quantity etc.

  3. Create consumer instance: Use consumer configuration to create one or more consumer instances. Each instance represents a member of a consumer group. The instance will automatically register with the Kafka broker and establish a connection with the coordinator.

  4. Subscribe to a topic: Use the subscribe() method to subscribe to one or more Kafka topics through a consumer instance. This tells Kafka which topics you wish to receive messages from.

  5. Start consumer: Call the poll() method to start polling messages. This will start the consumer instance and start pulling messages. Each member of the consumer group performs this step independently.

  6. Consuming messages: Once the messages are pulled, the consumer instance will process these messages and execute your business logic. Each member processes messages in its own thread.

  7. Commit offset: The consumer instance can choose to manually or automatically commit the offset of the processed message. This helps record the progress of processing messages in each partition.

  8. Handling exceptions: Exceptions may occur during message processing, and you need to handle these exceptions appropriately, such as retrying the message or logging an error.

  9. Close consumer: When a consumer instance is no longer needed, be sure to close it to release resources.

  10. Automatic rebalancing: If a consumer instance joins or leaves the consumer group, or the distribution of partitions changes, Kafka will automatically rebalance to ensure even distribution of messages.

This initialization process covers the basic steps of the Kafka consumer group, from configuring consumer group members to message processing and consumption. Please note that the initialization of the Kafka consumer group requires attention to various configuration options and the coordination process of the consumer group to ensure normal operation and load balancing.

Independent consumer case (subscription topic)

Requirement: Create an independent consumer to consume data in the artisan topic

Note: The consumer group ID must be configured in the consumer API code.

package com.artisan.pc;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @author little craftsman
 * @version 1.0
 * @mark: show me the code, change the world
 */
public class CustomConsumer {<!-- -->

    public static void main(String[] args) {<!-- -->
        // 1. Create the consumer configuration object
        Properties properties = new Properties();

        // 2. Add parameters to the consumer configuration object
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
        // Configuration serialization required
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //Configure consumer group required
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "artisan-group");

        // 3. Create consumer object
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 4. Subscribe to the topic
        ArrayList<String> topics = new ArrayList<>();
        topics.add("artisan");
        consumer.subscribe(topics);

        // 5. Pull data and print
        while (true) {<!-- -->
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 6. Traverse and output the consumed data
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {<!-- -->
                System.out.println(consumerRecord);
            }
        }
    }
}
    

① Execute the consumer program in IDEA
② Create a kafka producer on the server and enter data

③ Observe the received data in IDEA

ConsumerRecord(topic = artisan, partition = 2, leaderEpoch = 0, offset = 34, CreateTime = 1698630425187, serialized key size = -1, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false ), key = null, value = first message)
ConsumerRecord(topic = artisan, partition = 2, leaderEpoch = 0, offset = 35, CreateTime = 1698630429909, serialized key size = -1, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = seconde message)

Consumer important parameters

Parameter name Description
bootstrap.servers The host/port list used to establish an initial connection to the Kafka cluster.
key.deserializer Specifies the deserialization type of the key receiving the message. You need to write the full class name.
value.deserializer Specifies the deserialization type of the value of the received message. You need to write the full class name.
group.id Marks the consumer group to which the consumer belongs.
enable.auto.commit The default value is true, and the consumer will automatically submit offsets to the server periodically.
auto.commit.interval.ms If enable.auto.commit=true, it indicates the frequency of consumer submission offsets, the default is 5 Second.
auto.offset.reset How to handle when there is no initial offset in Kafka or the current offset does not exist in the server. Optional values include “earliest”, “latest”, “none”,
offsets.topic.num.partitions __consumer_offsets number of partitions , the default is 50 partitions.
heartbeat.interval.ms The heartbeat time between the Kafka consumer and coordinator, the default is 3 seconds. It must be less than session.timeout.ms and should not be higher than 1/3 of session.timeout.ms.
session.timeout.ms The connection timeout between Kafka consumer and coordinator, the default is 45 seconds. Above this value, consumers are removed and the consumer group is rebalanced.
max.poll.interval.ms The maximum time for consumers to process messages, the default is 5 minutes. Above this value, consumers are removed and the consumer group is rebalanced.
fetch.min.bytes The consumer obtains the minimum number of bytes in a batch of messages from the server, the default is 1 byte.
fetch.max.wait.ms The default is 500 milliseconds. If the minimum number of bytes of a batch of data is not obtained from the server and the waiting time is up, the data will still be returned.
fetch.max.bytes The default is 52428800 (50 megabytes). The consumer obtains the maximum number of bytes in a batch of messages from the server. If a batch of data on the server side is larger than this value, the batch of data can still be retrieved. This is not an absolute maximum value. The size of a batch is limited by message.max.bytes (broker configuration) or max.message.bytes ( topic configuration) impact.
max.poll.records The maximum number of messages returned by one poll pull data, the default is 500.