Kakfa – Producer mechanism principle and optimization

Producer is the producer component in the Kakfa model, which is the production source of data in the Kafka architecture. Although it is a relatively simple component as a whole, there are still many details that need to be scrutinized. For example, what is the implementation principle of Kafka’s Producer and how to send messages? What is the IO communication model? In actual work, how to optimize to achieve high efficiency?

Simple producer program:

1. Client initialization KafkaProducer

new KafkaProducer() is the Producer initialization process, such as Interceptor, Serializer, Partitioner, RecordAccumulator, etc. When we use KafkaProducer to send a message, the message will go through the interceptor (Interceptor), serializer (Serializer) and partitioner (Partitioner ), and finally it will be temporarily stored in the message collector (RecordAccumulator), and the final reading will be sent in batches.

The following is the core tracking mechanism process:

1. Initialize the RecordAccumulator record accumulator

Brief introduction: RecordAccumulator can be understood as the data buffer sent by Producer. When sending data, Producer will not directly connect to Broker and send it one by one. Instead, it will put the data (Record) into RecordAccumulator and send it in batches.

2. Initialize the Sender’s Iothread. During the initialization process, the Producer will create an additional ioThread.

2. Send method

At this point, Kafka has only done some initialization work, but has not established a connection with the kafka cluster, and there is no relevant metadata information. Then continue to look at the doSend method in send.

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        try {
            
            try {
               // waitOnMetadata updates metadata
                clusterAndWaitTime = this.waitOnMetadata(record.topic(),record.partition(), this.maxBlockTimeMs);
            } catch (KafkaException var19) {
             
            Cluster cluster = clusterAndWaitTime.cluster;
            ........
            byte[] serializedKey;
            try {
            // Serialization
             serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var18) {
            ........

            byte[] serializedValue;
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            }
            // Get partition information in metadata
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            //..........
            //Data append to accumulator
            RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
           ...........

            return result.future;

1. WaitonMetadat metadata update

Internally, the method will first determine whether metadata partitions exist in the current Cluster. If they do not exist, it means that the connection has not been established to obtain metadata. At this time, it will wake up the sender thread.

Note: Cluster is not completely spatiotemporal at this time, it already has specified Node list information.

In early versions, metadata was stored in zookeeper. Metadata refers to partition information, node information, and mapping relationships between nodes, topics, and partitions in the cluster. Without the support of metadata when the producer starts, data cannot be sent, which is equivalent to being blind. However, zookeeper stores metadata, which will put pressure on the network card of zookeeper in concurrent scenarios, which means that the reliability of zookeeper must be ensured before ensuring the reliability of Kakfa.

So after version 1.0, Kafka maintains metadata in the Broker node. Producer can obtain metadata through Borker, reducing dependence on zookeeper. Only some core content is handed over to zookeeper for distributed coordination.

2. Sender thread run method

The run method in the Sender thread is a while (running), which is a common reactive programming method in a Loop process. For example, the Redis service is also an EventLoop event polling process.

Its internal core method NetWorkClient.poll implements client connection, data sending, and event processing.

When the metadataUpdater.maybeUpdate method is executed for the first time, because there is no metadata node information, the this.maybeUpdate(now, node) method will be executed. The initiateConnect method is implemented inside the method for the client to establish a connection. The bottom layer is the Java used. Selector multiplexer for Nio.

After establishing the connection, nioSelector.select() waits for event response.

Then trigger the handleCompletedReceives processor to perform the metadata synchronization process.

Note: After completing the metadata update, metadata.update will call this.notifyAll() to wake up the blocked main thread to send data.

At this point, the waitOnMetadata method of the main thread has completed the update of metadata.

After that, main begins to process Serializer serialization, obtain partition metadata information, and send data.

3. RecordAccumulator Record Accumulator

When the producer sends data, it does not send each message after establishing a connection, but sends the messages in batches. RecordAccumulator The batches in the object will maintain a double-ended queue for each TopicPartition. Used to cache record data.

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

TopicPartition topic partition: the buffer is divided into different double-ended queues according to the topic

Deque double-ended queue, ProducerBatch: a batch of data (multiple data, default capacity 16k)

The structure is as follows:

The producer uses Synchronized when adding data to batches, so the Producer is thread-safe in multi-threaded scenarios.

Why have RecordAccumulator?

The main function of RecordAccumulator is to temporarily store the messages sent by Main Thread, and then Sender Thread can obtain messages in batch from RecordAccumulator, reducing The number of requests for a single message acquisition reduces the IO pressure on the network card and improves performance efficiency.

Relevant parameter configuration and tuning advantages:

1.RecordAccumulator buffer.memory default size is 32mb

Refers to the maximum buffer.memory=32mb carried by all batchesof RecordAccumulator in each new KafkaProducer.

Setting method:properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3210241024);

If the RecordAccumulator cache space is full, it will block and wait for the data to be consumed. If the message is not sent and removed within the specified time, that is, it is still full, an exception will be thrown. The default is 60s.

Setting method:properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000);

Optimization points: According to business needs, if there are many TopicPartitions and the amount of data is large, even if there are very few batches in a single TopicPartition, the total capacity may exceed 32mb. In this case, the buffer_memery size can be expanded.

2. The default size of a single batch in Kafka is 16k.

Refers to the size of each batch being 16k. Batch is used to store data Record.

If record < 16k, the batch can store multiple data, and the batch space will be reused.

If record > 16k, the current record will apply for additional storage space, which will be destroyed after use.

Optimization point: The batch size needs to be evaluated based on the business. Do not have too many records. Ensure that each batch can accommodate records and minimize frequent application and destruction of memory space, as well as memory fragmentation.

3. Choice of synchronous blocking and non-blocking

RecordAccumulator is used to support sending data in batches. The send method in KafkaProducer is an asynchronous interface, and it can be blocked through the send.get() method, waiting for data to be returned.

To send data synchronously, you need to wait for Kafka to receive the record and respond before the producer will send the next record. Although there will be higher consistency at this time, RecordAccumulator loses its meaning.

In the case of non-blocking send, when the production and consumer IOs are asymmetric, you can pass LINGRE_MS_CONFG 30 to require the sender thread to pull data from RecordAccumulator every time Wait for a period of time before pulling, and try to ensure that it is pulled in batches to reduce more network IO.

Setting method: properties.put(ProducerConfig.LINGRE_MS_C0NFG , 0);

Continue content analysis

At this point, when the Main thread appends the data to the RecordAccumulator container, its core work is over. At this time, it will also call sender.wakeup to inform that there is data that needs to be processed, and to ensure that the sender thread will not block.

The Sender thread is a Loop process. During the process of sending data, it will pull batches of data from the RecordAccumulator for packaging and sending, instead of sending them one by one. The default encapsulated packet size is 1mb.

Setting method:pp.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(1 * 1024 * 1024))

Before actually sending the data, the Sender thread additionally stores the Request data in InFilgntRequest (packet in flight). The default size of InFilgntRequest is 5, which means that after the producer sends 5 packet requests to Kafka, When there is no response, the sending stops and becomes blocking state.

This design has no effect in the synchronous sending process, because the synchronous process is returned per request.

SEND_BUFFER_CONFIG send buffer configuration, RECEIVE_BUFFER_CONFIG receive buffer configuration, these two are the buffer configurations of the IO layer, different operating systems may be different. Set to -1 to use the system allocated size by default.

pp.setProperty(ProducerConfig.SEND_BUFFER_CONFIG, String.valueOf(32 * 1024));
pp.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG, String.valueOf(32 * 1024));

View the kernel default configuration:

At this point, the entire data sending process mechanism of Producer is clear. The setting of Ack involves Broker data synchronization and Consumer consumption status. This section will be analyzed separately.

To summarize:

1. The implementation of Producer is completed by a combination of Main thread and Sender thread.

The main thread core completes data input, Producer initialization and data appending to the RecordAccumulator. Specific metadata updates, data sending and other IO operations are all completed by the Sender thread.

The Sender thread working mode is a common reactive programming mode in middleware. It performs client connection, metadata update, data packaging and sending during the Loop process.

2. The IO operation in Kakfa encapsulates the implementation of Nio (Selector) in Java. The bottom layer is the implementation of multiplexer, not netty.

3. The process of Producer sending data is not simply sending data one by one. It internally encapsulates RecordAccumulator, Batche, and Request packages, which can send data in batches and reduce the number of IOs. At the same time, it is combined with the FilghtRequest in-flight request size limit to ensure that when Kafka does not respond normally, exceptions are thrown to prevent data loss. During the development process, parameters can be adjusted to achieve optimization purposes.