Kafka: the process of message sending and consumption

Summary

Kafka stores messages, producers send messages, and consumers consume messages. These seem simple, but when you actually think about it, there are many problems to be solved: Should the messages be sent individually or in batches? As soon as there is a message in the broker’s topic, will it be pushed to the consumer immediately? How does the producer’s message ensure that it is successfully sent to Kafka, and how does Kafka ensure that the message is delivered to the consumer?

producer

The producer component and sending process are shown in the following figure:

1. Create a producer, and the producer creation code is as follows:

 Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(producerConfig);

Basically, you only need to specify the broker address, key and value serialization class to create it. It is recommended here that the serialization class use the String serialization class, that is, the message is a string stored in Kafka, no matter which language or framework, the string is recognized, and there is no compatibility problem between old and new messages.

Of course, there are many producer parameters. You can refer to ProducerConfig for details. The main parameters are: acks, buffer.memory, compression.type, retries, batch.size, linger.ms, client.id, max.in.flight. request.per.connection, timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms, max.block.ms, max.request.size, receive.buffer.bytes, send.buffer.bytes.

2. Send a message. The ways to send a message are: send and forget, send synchronously, and send asynchronously.

Send and forget: call the send() method, regardless of the return value, the specific code is as follows:

producer.send(new ProducerRecord<>("topic", "key","value"));

Synchronous sending: Call the send() method, which will actually return the Future object, and then call the get() method of the Future object. After the send is successful, the get() method will return the message metadata object RecordMetadata, and the offset can be viewed in this object Offset and other data. Note that if acks=0, offset will only return -1, and the real offset returned by the broker cannot be obtained. details as follows:

 try {
            RecordMetadata result = (RecordMetadata)producer.send(new ProducerRecord<>("topic", "key","value")).get();
            System.out.println(result.partition() + ":" + result.offset());
        }catch (Exception e){
            e.printStackTrace();
        }

Asynchronous sending: first define a Callback implementation class, implement the onCompletion method, and pass in the Callback when calling the send() method. details as follows:

public class ProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if(exception != null){
            System.out.println("Message sending exception: " + exception.getMessage());
        }else{
            System.out.println("Message sent successfully [" + metadata.partition() + ":" + metadata.offset() + "]");
        }
    }
}
producer.send(new ProducerRecord("topic","key","value"),new ProducerCallback());

Since it is sent asynchronously, the main thread will end without waiting for onCompletion to run.

3. Sending process: Combined with the flow chart and API, the specific steps are as follows:

(1) The producer first serializes the message, and the serialization class can be specified with key.serializer and value.serializer parameters. In addition, before serialization, the interceptor can be called to process the message, and the interceptor.classes parameter can specify the interceptor.

(2) Partition the message. If the ProducerRecord does not specify a partition, it will be partitioned by key by default, and messages with the same key will be assigned to the same partition. If the key is empty, the allocation partition will be polled.

(3) Record the messages of the same topic and partition into the same batch. If the batch of messages meets the trigger condition (the size reaches the buffer limit or request size limit, etc.) or there are free threads, the batch of messages will be Send to broker.

(4) After the broker receives the message and writes it to the corresponding topic, partition and offest, it will respond to the client with the result.

(5) If the broker fails to write the message, an error will be returned, or the producer will fail to send the message, and the producer will retry until the number of times specified by the retries parameter is completed.

4. The key to successful message sending: The key to ensuring successful sending is the producer acks parameter and retries parameter. acks=0, there is no guarantee that the message is sent successfully, because the producer will not confirm the broker’s response. acks=1, the producer will only confirm the response of the leader node in the broker. Only when acks=all, the producer will confirm that the leader node writes and the follwer node synchronization is successful. When it fails, the message can be retried several times through retries and the retry interval can be set through the retry.backoff.ms parameter.

consumer

Consumers mainly consume messages in partitions, as shown in the following figure:

You can see that consumers in the same consumption group will consume messages under the partition respectively. Note that if the number of consumers is greater than the number of partitions, there will be idle consumers, and messages from a partition can only be consumed by one consumer, not multiple consumers.

1. To create a consumer, the creation code is as follows:

 Properties consumerConfig = new Properties();
// consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put("bootstrap.servers", "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put("group.id","boot-kafka");
        consumerConfig.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(consumerConfig);

2. Subscribe to the topic, the specific code is as follows:

 kafkaConsumer.subscribe(Arrays.asList("device-alarm-test"));

Of course, this subscription can subscribe to multiple topics, and can also subscribe to topics that match regular expressions.

3. Polling to get the message, the specific code is as follows:

 while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(record.value());
                    }
                }

The timeout of the poll method refers to the time for the consumer to wait for the message to be returned. Regardless of whether the message is obtained or not, it will return after waiting for this time. This method hides all the details of group coordination, partition rebalancing, sending heartbeats, getting data,

4. Consumption process: First of all, you need to understand fetch and poll. When poll is called, data will be fetched from the cache first. If there is no data in the cache, a request will be sent to obtain the broker message. The key source code is as follows:

The specific consumption steps are as follows:

(1) Consumers request data through the poll() method. This method will first go to the buffer area (a ConcurrentLinkedQueue) to obtain data. If there is data in the buffer area, it will return at most max.poll.size messages.

(2) If there is no data in the cache, the client.send() method will be called to send a request to the broker to obtain the data.

(3) If the broker receives a consumer request, it will refer to the two values fetch.min.bytes (the minimum number of bytes to obtain the record) and fetch.max.wait.ms (the maximum waiting time to obtain the record), such as two The values are 1M and 500ms, respectively, and the broker needs to wait for the message backlog to 1M or wait for 500ms before the messages of the aggregated partitions are returned to the consumers. These two parameters are set in the consumer, and these two parameters will be brought with send().

(4) When the broker aggregates partition messages, in order to balance the partition data, each partition returns at most max.parttion.fetch.bytes data to the broker. This parameter is set in the consumer, and the default is 1M. This value must be greater than the broker’s max.message.size. Otherwise, the broker can store large messages, but the partition cannot send messages. In addition, fetch.max.bytes is 50 times of max.parttion.fetch.bytes by default, that is, each topic can configure up to 50 partitions.

(5) After the consumer obtains the message returned by the broker, it will save it in the cache, and then call the fetcher.collectFetch() method to get the message from the cache and return it.

(6) If enable.auto.commit=true, the offset will be submitted automatically, and the offset will be submitted every auto.commit.interval.ms (default 5 seconds) by default. Of course, it can be submitted manually, and you need to call the synchronous or asynchronous submission API to update the offset after poll() obtains the record consumption.

5. The key to the success of consuming messages: Submit the offset, and update the offset in time after the consumer obtains the message consumption is the key to ensure accurate consumption. If the offset is not updated in time, it will lead to repeated consumption, but the update error offset For example, if the offset update is large, it will lead to missed consumption. So the update of the offset is the key to accurate consumption. Of course, we can also set UUID to identify the message, and use UUID to deduplicate the message.