Kafka – 3.x diagram illustrating the overall workflow of Broker

Article directory

  • Kafka information stored in Zk
  • Kafka Broker overall workflow
    • 1. After the broker is started, register with zk
    • 2. Whoever starts the registration first of the Controller has the final say
    • 3. The elected Controller monitors changes in broker nodes
    • 4. Controller decides leader election
    • 5. Controller uploads node information to Zk
    • 6. Other Controllers synchronize related information from zk
    • Sending and storing messages
    • 7. Assume that the Leader in Broker1 hangs up
    • 8 Controller monitors node changes
    • 9 Get ISR
    • 10 Elect new Leader
    • 11. Update Leader and ISR
  • Instance simulation
  • Broker important parameters


kafka information stored in Zk


Currently stored directly in the root directory

#Configure the address to connect to the Zookeeper cluster (create /kafka in the zk root directory for easy management)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

Kafka Broker overall workflow

1. Register with zk after the broker is started

2. Whoever starts the registration of the Controller first has the final say

3. The elected Controller monitors changes in broker nodes

4. Controller decides leader election

5. Controller uploads node information to Zk

6. Other Controllers synchronize related information from zk

Message sending and storage

7. Assume that the Leader in Broker1 crashes

8 Controller monitors node changes

9 Get ISR

10 Elect a new Leader

11. Update Leader and ISR

Instance simulation

1) Case content: simulate kafka going online and offline, and view data changes in zookeeper
2) View kafka node related information: ① View kafka cluster node information on zookeeper
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 103, 104]
② View controller information in the current kafka cluster node
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller
{<!-- -->"version":1,"brokerid":103,"timestamp":"1637292471777"}
③ Check the status of partition 0 of the first topic in kafka
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state
{<!-- -->"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,103,104]}
3) Simulate kafka offline: stop kafka on hadoop103
[xxx@hadoop103 kafka]$ bin/kafka-server-stop.sh
4) View kafka related node information
① Check the kafka cluster node information on zookeeper
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 104]
② View controller information in the current kafka cluster node
[zk: localhost:2181(CONNECTED) 2] ls /kafka/controller
{<!-- -->"version":1,"brokerid":102,"timestamp":"1637292471777"}
③ Check the status of partition 0 of the first topic in kafka
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/partitions/0/state
{<!-- -->"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,104]}
5) Restart the kafka service on hadoop103
[xxx@hadoop103 kafka]$ bin/kafka-server-stop.sh
6) Check the above nodes again and observe the difference changes

Broker important parameters

Parameter name Description
replica.lag.time.max .ms If the Follower in the ISR exceeds the event threshold (default 30s) and fails to send synchronization data to the Leader, the Follower will be kicked out of the ISR.
auto.leader.rebalance.enable The default is true. Automatic Leader Partition balancing.
leader.imbalance.per.broker.percentage The default is 10%. The ratio of unbalanced leaders allowed by each broker. If each broker exceeds this value, the controller will trigger leader balancing.
leader.imbalance.check.interval.seconds The default value is 300 seconds. The interval to check whether the leader load is balanced.
log.segment.bytes The log log in Kafka is divided into blocks for storage. This configuration refers to the size of the log log divided into blocks. The default Value 1G.
log.index.interval.bytes The default is 4kb. Whenever a 4kb log (.log) is written in kafka, it will An index is recorded in the index file.
log.retention.hours The time for data storage in Kafka, the default is 7 days.
log.retention.minutes The time for data storage in Kafka, minute level, closed by default.
log.retention.ms The time for data storage in Kafka, millisecond level, closed by default. (Highest priority)
log.retention.check.interval.ms The timeout interval for checking whether data is saved, the default is 5 minutes.
log.retention.bytes The default is -1, which means infinity. If the total size of all logs is exceeded, the oldest segment will be deleted.
log.cleanup.policy The default is delete, which means that the deletion policy is enabled for all data; if the setting value is compact, it means that the compression policy is enabled for all data.
num.io.threads The default is 8. The number of threads responsible for writing to disk. The entire parameter value should account for 50% of the total number of cores.
num.replica.fetchers The number of replica pull threads. This parameter accounts for 1/3 of 50% of the total number of cores.
num.network.threads The default is 3. The number of data transmission threads, this parameter accounts for 2/3 of 50% of the total number of cores.
log.flush.interval.messages The number of entries that force the page cache to be flushed to disk. The default is Max (long) (9223372036854775807). Generally left to system management.
log.flush.interval.ms How often should data be flushed to the disk? The default value is null. It is generally not recommended to modify it and leave it to the system to manage it.