Kafka – 3.x partition allocation strategy and rebalancing incomplete guide

Article directory

  • Production experience – partition allocation strategy and rebalancing
    • Range and rebalancing of producer partition allocation
      • Range partitioning strategy principle
      • Range partition allocation strategy and rebalancing case
    • RoundRobin strategy and rebalancing of producer partition allocation
      • RoundRobin partition strategy principle
      • RoundRobin partition allocation strategy and rebalancing case
    • Sticky and rebalancing of producer partition allocation
      • Sticky partition strategy principle
      • Sticky partition allocation strategy and rebalancing case


Production experience – partition allocation strategy and rebalancing

In Apache Kafka, determining which Consumer consumes data from which Partition is managed by Kafka’s Group Coordinator and Partition Assignment strategies. Here are some detailed explanations of this process:

  1. Consumer Group: A Consumer Group is a collection of Consumers that collaboratively consume data from one or more Kafka Topics. Consumer Groups are usually used to achieve load balancing of message processing to ensure that each message is processed once and not repeatedly.

  2. Topic and Partition: Kafka Topic can be divided into multiple Partitions, and each Partition is a subset of data. Splitting a Topic into multiple Partitions helps achieve parallel data processing and improve throughput.

  3. Group Coordinator: There is a Group Coordinator in the Kafka cluster, which is responsible for managing the activities of the Consumer Group. Members of the Consumer Group communicate with the Group Coordinator regularly to ensure their status and allocation.

  4. Partition Assignment strategy: When the Consumer Group starts, the Group Coordinator is responsible for allocating the Partition to be consumed to each member of the Consumer Group. This allocation is done based on the Partition Assignment strategy. Kafka provides different strategies to implement different allocation methods, such as Round Robin, Range, or custom allocation strategies.

  • Round Robin: This is a simple strategy where each Consumer is allocated a Partition in turn and then loops. This distributes the partitions evenly, but may cause imbalance because some partitions may be larger or more active than others.

  • Range: The Range strategy attempts to assign adjacent Partitions to the same Consumer to minimize network transmission.

  • Custom policy: You can also write a custom Partition Assignment policy to meet specific needs.

Eventually, each Consumer will be assigned a set of Partitions, and they will be responsible for consuming data from these Partitions. This allocation process is dynamic, so if the number of members of the Consumer Group changes, or a new Partition is added to the Topic, the allocation strategy will recalculate and allocate Partitions.

It should be noted that Kafka’s allocation strategy and Consumer Group’s coordination mechanism make data consumption and load balancing relatively easy to manage, while allowing horizontal expansion to adapt to workloads of different sizes.

Parameter name Description
heartbeat.interval.ms Heartbeat time between Kafka consumer and coordinator, default 3 seconds. Must be less than session.timeout.ms, and should not be higher than 1/3 of session.timeout.ms.
session.timeout.ms The timeout period for the connection between Kafka consumer and coordinator, default 45 seconds. Above this value, consumers are removed and the consumer group is rebalanced.
max.poll.interval.ms The maximum time for consumers to process messages, the default is 5 minutes. Above this value, consumers are removed and the consumer group is rebalanced.
partition.assignment.strategy Consumer partition allocation strategy, the default strategy is Range + CooperativeSticky. Kafka can use multiple partition allocation strategies at the same time. Optional strategies include: Range, RoundRobin, Sticky, CooperativeSticky.

Range and rebalancing of producer partition allocation

Range partitioning strategy principle

Range partition allocation strategy is a strategy in Kafka for allocating partitions to consumers. Its basic principle is to assign a set of contiguous partitions to each consumer, so that each consumer is responsible for a certain range of contiguous partitions. This distribution method helps reduce network transmission and improve locality because adjacent partitions are usually on the same Broker and reduces Leader switching.

The following is the detailed principle and workflow of the Range partition allocation strategy:

  1. Determine the available partitions: First, the consumer group needs to determine the available partitions. This usually involves subscribing to one or more Kafka Topics and then getting a list of all partitions for each Topic.

  2. Sorting partitions: Next, the consumer group sorts the partitions. This is to ensure that all consumers operate in the same order when allocating partitions. Normally, partitions are sorted in ascending order by their partition ID.

  3. Determine the number of consumers: The consumer group needs to know how many consumer members it has. This can be done by communicating with the Group Coordinator to get the list of members and then counting the number of consumers.

  4. Allocate partitions: Now, the Range partition allocation strategy starts allocating partitions to consumers. It is accomplished through the following steps:

    a. Divide the sorted partition list evenly into several blocks. The number of blocks is equal to the number of consumers.

    b. Each consumer is then assigned a block that contains a contiguous set of partitions.

    c. The blocks allocated to each consumer are distributed according to the principle of uniform distribution, ensuring that the partition load of each consumer is approximately equal.

    d. If the number of partitions is not divisible by the number of consumers, the last consumer may get slightly more partitions than other consumers to ensure that all partitions are allocated.

  5. Allocation Complete: Once allocation is complete, each consumer knows which partitions it needs to consume from, and how to access those partitions. Consumers will start pulling data from these partitions for processing.

In summary, the Range partition allocation strategy achieves load balancing by allocating a contiguous set of partitions to each consumer. This helps reduce network overhead and improve data locality because adjacent partitions are usually located on the same Broker, thus reducing the number of Leader switching times and improving overall performance. This is one of Kafka’s default partition allocation strategies, but you can choose other strategies to meet your specific needs.

Range partition allocation strategy and rebalancing case

1) Prepare
① Modify the partitions of topic groupTest01 to 7 partitions

[xxx@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic groupTest01--partitions 7

② Create 3 consumers and form consumer group group02 and consumer topic groupTest01
③ Observe partition allocation

Start the first consumer and observe the partition allocation
// consumer01
Successfully synced group in generation Generation{<!-- -->generationId=1, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
Start the second consumer and observe the partition allocation
// consumer01
Successfully synced group in generation Generation{<!-- -->generationId=2, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0
// consumer02
Successfully synced group in generation Generation{<!-- -->generationId=2, memberId='consumer-group02-1-60afd984-6916-4101-8e72-ae52fa8ded6c', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5, groupTest01-4
Start the third consumer and observe the partition allocation
// consumer01
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-1, groupTest01-0
// consumer02
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group02-1-60afd984-6916-4101-8e72-ae52fa8ded6c', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-4])
Adding newly assigned partitions: groupTest01-3, groupTest01-4
// consumer03
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group02-1-fd15f50f-0a33-4e3a-8b8c-237252a41f4d', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5

Thinking: What will the partition allocation look like if a consumer exits?

RoundRobin strategy and rebalancing of producer partition allocation

RoundRobin partition strategy principle

The RoundRobin partition allocation strategy is a commonly used consumer partition allocation strategy in Kafka. Its principle is very simple: each consumer takes turns allocating Topic partitions to ensure that the partition distribution is even. This means that each consumer will process data from a different partition in turn and then start over.

The following is the detailed principle of RoundRobin partition allocation strategy:

  1. Consumer joins Group: When a new consumer joins the Consumer Group, or an existing consumer needs to rebalance partitions, the Group Coordinator (a group coordinator in the Kafka cluster) will trigger the partition allocation process.

  2. Calculate partition allocation: Under the RoundRobin strategy, the Group Coordinator will traverse all partitions of the Topic and allocate them to consumers in order. Assuming there are 3 partitions (Partition 0, Partition 1, Partition 2) and 2 consumers (Consumer A and Consumer B), the distribution may be as follows:

    • Consumer A: Partition 0
    • Consumer B: Partition 1
    • Consumer A: Partition 2
    • Consumer B: Partition 0
    • Consumer A: Partition 1
    • Consumer B: Partition 2
  3. Partition allocation round-robin: This process continues, allocating partitions in a round-robin fashion, ensuring that each consumer has a chance to consume all partitions. This also helps with load balancing since each consumer will handle a different partition, spreading the load.

  4. Dynamic load balancing: If a new consumer joins the Consumer Group or an existing consumer leaves, the partition allocation will be recalculated to ensure that it adapts to the change in the number of consumers.

The advantage of RoundRobin’s partition allocation strategy is that it is simple and fair, and each consumer has the same opportunity to consume each partition. However, it may not work in all cases, especially when partitions are not uniform in size or activity. In this case, other partition allocation strategies such as Range or Sticky may be more suitable, which will consider the characteristics of the partition more granularly to achieve better load balancing.

RoundRobin partition allocation strategy and rebalancing case

  • Modify the consumer code and the consumer group is group03.
  • Modify the partition allocation strategy to roundrobin
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

Start three consumers in sequence and observe the console output

Start the consumer CustomConsumer01 and observe the console output
// customConsumer01
Successfully synced group in generation Generation{<!-- -->generationId=1, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
Start the consumer CustomConsumer02 and observe the console output
// customConsumer01
Successfully synced group in generation Generation{<!-- -->generationId=2, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-2, groupTest01-4, groupTest01-6])
Adding newly assigned partitions: groupTest01-2, groupTest01-0, groupTest01-6, groupTest01-4
// customConsumter02
Successfully synced group in generation Generation{<!-- -->generationId=2, memberId='consumer-group03-1-5771a594-4e99-47e8-9df6-ca820af6698b', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-1, groupTest01-3, groupTest01-5])
Adding newly assigned partitions: groupTest01-3, groupTest01-1, groupTest01-5
Start the consumer CustomConsumer03 and observe the console output
// customConsumer01
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-3, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-0, groupTest01-6
// customConsumter02
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group03-1-5771a594-4e99-47e8-9df6-ca820af6698b', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-1, groupTest01-4])
Adding newly assigned partitions: groupTest01-1, groupTest01-4
// customConsumter03
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group03-1-d493b011-d6ea-4c36-8ae5-3597db635219', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-2, groupTest01-5])
Adding newly assigned partitions: groupTest01-2, groupTest01-5

Thinking: What will the partition allocation look like if a consumer exits?

Sticky and rebalancing of producer partition allocation

Principle of Sticky partition strategy

It can be understood that the distribution result is “sticky”. That is, before performing a new allocation, consider the results of the previous allocation and adjust the allocation changes as little as possible, which can save a lot of overhead. Sticky partitioning is an allocation strategy introduced by Kafka starting from version 0.11. strong>We will try our best to keep the original allocated partitions unchanged

“Sticky” partition allocation strategy is a partition allocation strategy in Kafka, which is used to allocate partitions to consumers. Its main goal is to minimize the frequency of partition rebalancing to improve the stability of the consumer group.

The principle of Sticky partition allocation strategy is as follows:

  1. Initial allocation: Initially, Kafka distributes partitions evenly to consumers. Each consumer fetches one or more partitions in sequence to ensure the load is distributed as evenly as possible.

  2. Sticky partition allocation: Once a partition is allocated to a consumer, the partition will try to remain allocated to the same consumer. This is where the policy name “Sticky” comes from, because it attempts to “stick” the partition to a consumer that has already processed it.

  3. Partition rebalancing: If a new consumer joins the consumer group, or a consumer leaves, the system needs to perform partition redistribution. However, the Sticky strategy will minimize the frequency of rebalancing. It does not reallocate all partitions at once, but tries to keep the allocated partitions unchanged and only allocates new partitions.

  4. Rebalancing triggers: Rebalancing can be triggered by the following situations:

    • Consumers join or leave consumer groups.
    • Consumer heartbeat timed out.
    • In the event that a partition loses contact, it may be redistributed.
  5. Maintain stickiness: The Sticky strategy will try its best to maintain the stability of partition allocation to reduce the number of partition redistributions, thereby reducing the instability of the entire consumer group. This helps improve system availability and performance.

The main advantages of the Sticky strategy are to reduce the frequency of partition reallocation, reduce system instability, and reduce the cost of rebalancing. This is particularly useful for large-scale Kafka clusters and high-throughput consumer groups. However, it may not necessarily work in all cases, as in some cases more frequent rebalancing is required to ensure fair load distribution. Therefore, when choosing a partition allocation strategy, you need to weigh it based on your specific use cases and needs.

Sticky partition allocation strategy and rebalancing case

  1. Modify the partition allocation strategy and modify the consumer group to groupTest03
//Modify partition allocation strategy
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
  1. After starting three consumers respectively, view the third allocation as follows
Start the consumer CustomConsumer01 and observe the console output
// customConsumer01
Successfully synced group in generation Generation{<!-- -->generationId=1, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
Start the consumer CustomConsumer02 and observe the console output
// customConsumer01
Successfully synced group in generation Generation{<!-- -->generationId=2, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0
// customConsumer02
Successfully synced group in generation Generation{<!-- -->generationId=2, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5, groupTest01-4
Start the consumer CustomConsumer03 and observe the console output
// customConsumer01
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-1, groupTest01-0
// customConsumer02
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5])
Adding newly assigned partitions: groupTest01-5, groupTest01-4
// customConsumer03
Successfully synced group in generation Generation{<!-- -->generationId=3, memberId='consumer-group06-1-eb0ac20c-5d94-43a7-b7c1-96a561513995', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-6
3) After killing the consumer CustomConsumer01, observe the console output
// customConsumer02
Successfully synced group in generation Generation{<!-- -->generationId=4, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-0, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-0, groupTest01-5, groupTest01-4
// customConsumer03
Successfully synced group in generation Generation{<!-- -->generationId=4, memberId='consumer-group06-1-eb0ac20c-5d94-43a7-b7c1-96a561513995', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-6, groupTest01-1])
Adding newly assigned partitions: groupTest01-3, groupTest01-1, groupTest01-6