[Solved] kakka rebalance solution

kakka rebalance solution

Introduction

 The rebalance of apache kafka has been criticized for a long time. Because the rebalancing process will trigger stop-the-world (STW), the resources corresponding to the topic will be in an unavailable state at this time. Small-scale clusters are fine. If it is a large-scale cluster, such as a consumer or kafka connect with hundreds of nodes, then rebalancing is a disaster. So we want to avoid rebalancing as much as possible.
    
    At the time of kafka2.4, the community launched two new features to solve the STW problem in the rebalancing process.
1. Incremental Rebalance Protocol (hereinafter referred to as cooperative protocol): The problem of eager protocol (ie old rebalance protocol) has been improved to avoid the occurrence of STW. How to avoid it will be introduced later
2. Static membership: Avoid restarting or temporarily leaving consumers to trigger rebalancing

apache kafak2.4 incremental cooperative rebalancing protocol

Background

 Load balancing is basically an essential function in distributed systems, and apache kafka is no exception. In order to make the process of consuming data as balanced as possible in the Kafka cluster, Kafka has introduced the function of rebalancing, which can help Kafka clients (consumer client, kafkaconnect, kafka stream) achieve load balancing as much as possible.
      But before kafka2.3, rebalancing various allocation strategies are basically based on eager protocol (including RangeAssignor, RoundRobinAssignor, etc.), which is the kafka rebalancing we used to know
     It is worth mentioning that kafka has launched a new rebalancing allocation strategy, the `StickyAssignor` sticky allocation strategy, the main function is to ensure that clients, such as consumers, can maintain the original allocation plan after rebalancing, unfortunately This allocation strategy is still under the framework of the eager protocol, and rebalancing still requires each consumer to give up the resources (partitions) currently held first.
      At the time of 2.x, the community realized the need to make changes to the existing rebalance. Therefore, in kafka 2.3, the cooperative protocol was first applied to kafka connect, and then the support of the protocol was added to the consumer client in kafka 2.4.

Incremental cooperative rebalancing protocol analysis

Next, we will introduce the specific difference between cooperative agreement and eager agreement. In one sentence, the cooperative protocol changes a global rebalance to a small-scale rebalance each time, until the final convergent balance process**.

Here we mainly give an example for one scenario to compare the difference between the two protocols.

Suppose there is such a scenario, a topic has three partitions, namely p1, p2, and p3. There are two consumers c1, c2 is consuming these three partitions, {c1 -> p1, p2}, {c2 -> p3}.

Of course, this is unbalanced, so add a consumer c3 to trigger rebalancing at this time. We first list the general steps that will be performed under the framework of the eager protocol, and then list the steps that occur in the cooperation for comparison.

Eager protocol version nouns:

- group coordinator: rebalance coordinator, responsible for handling various events in the rebalance life cycle

- hearbeat: the heartbeat of the consumer and the broker, the information will be notified through this heartbeat during rebalancing

- join group request: the request of the consumer client to join the group

- sync group request: the allocation plan sent by the group coordinator to the consumer client in the later stage of rebalancing

If in the eager version, the following things will happen.

1. At the beginning, c1 and c2 each send heartbeat information to the group coordinator (the coordinator responsible for rebalancing)

2. At this time, the group coordinator receives a request to join the group, and the group coordinator knows that a new member has joined the group.

3. In the next heartbeat, the group coordinator notifies c1 and c2 to prepare for rebalance

4. **c1 and c2 revoke their respective partitions**, and then send the joingroup request to the group coordinator

5. The group coordinator handles the allocation plan (assigned to the leader consumer), and sends a sync group request to c1 c2 c3 with a new allocation plan

6. After c1 c2 c3 receives the allocation plan, it restarts consumption

It is represented by a diagram as follows:

Some details are omitted here, but overall it should be easier to understand the process. Next, let’s see how the cooperative agreement will be handled.

When the cooperative agreement is reached, it will become like this:

cooperative rebalancing protocol

If in the cooperative version, the following will happen.

1. At the beginning, c1 and c2 each send heartbeat information to the group coordinator

2. At this time, the group coordinator receives a request to join the group, and the group coordinator knows that a new member has joined the group.

3. In the next heartbeat, the group coordinator notifies c1 and c2 to prepare for rebalance. The previous parts are the same

4. **c1 and c2 send the joingroup request to the group coordinator, but do not need to revoke the partitions they own, but encode the partitions they own and send them to the group coordinator together, ie {c1->p1, p2} , {c2->p3}**

5. The group coordinator obtains the current partition information from the metadata (this is called assigned-partitions), and then obtains the allocated partitions (this is called owned-partitions) from the joingroup request of c1 c2, through assigned-partitions and owned- The partitions know the current allocation situation, decide to cancel the consumption right of a partition p2 of c1, and then send a sync group request ({c1->p1}, {c2->p3}) to c1 c2, so that they can continue to consume p1 p2

6. After c1 and c2 receive the distribution plan, they start to consume again, and a rebalance is completed. **Of course, p2 is in a state of no consumption at this time**

7. Trigger rebalance again and repeat the above process, but this time the purpose is to assign p2 to c3 (get the partition allocation status through assigned-partitions and owned-partitions)

The same diagram is shown as follows:

One of the cores of the rebalancing of the cooperative protocol version is assigned-partitions and owned-partitions. Through these two, the group coordinator can save and obtain the consumption state of the partition, so as to perform multiple rebalancing and reach the final equilibrium state.

In addition to the scene where consumers crash and leave, other scenes are similar

apache kafka2.4 static membership function

We know that there are three conditions for the current rebalancing to occur:

- The number of members changes, that is, a new member joins or an existing member leaves the group (including actively leaving the group and crashing and leaving the group)
- The number of subscribed topics changes
- The number of subscribing topic partitions changes

     Among them, member joining or member leaving the group is the most common situation that triggers rebalancing. When a new member joins this scenario, rebalancing will inevitably occur, and there is no way to optimize it (there are other optimizations for initializing multiple consumers, that is, delaying rebalancing), but it can be optimized when consumers crash and leave the group. Because a consumer crashes and leaves the group usually does not affect the allocation of other {partition - consumer}.
**Therefore, an optimization was introduced in kafka 2.3~2.4, that is, the [Static Membership](https://www.lmlphp.com/r?x=CtQ9ytoWsjbvPE4vPHw7yekGCeT6zihAs_gMsMQVPiKd4Ew9PLQ3zMlVgiI9PLQ3PSbd4ESN4Lh_CerO5tOwp8) function and a configuration parameter of the consumer side introduced this time group.instance.id`**. Once this parameter is configured, the member will automatically become a static member, otherwise it will still be treated as a dynamic member as before.

**The benefit of a static member is that its static member ID value is unchanged, so are all partitions previously assigned to the member. That is, if a member hangs up, the rebalance will not be triggered when the static member restarts before the timeout (the timeout is `session.timeout.ms`, the default is 10 sec). During the time when the static member hangs, the broker will keep saving the state (offset) for the consumer until the timeout or the static member is reconnected.

If the static membership function is used, the conditions for triggering rebalance are as follows:

- New members join the group: this condition remains the same. When a new member joins, it will definitely trigger Rebalance to redistribute the partition
- Leader members rejoin the group: for example, the topic allocation scheme has changed
- Existing member has been out of the group for longer than the `session.timeout.ms` timeout: even if it is a static member, the Coordinator will not wait for it indefinitely. Once the session timeout time is exceeded, Rebalance will still be triggered
- The Coordinator receives a LeaveGroup request: the member actively informs the Coordinator to leave the group permanently.
So the two conditions for using static membership are:

1. Add configuration to consumer client: props.put("group.instance.id", "con1");
2. Set `session.timeout.ms` to a reasonable time, this parameter is limited by `group.min.session.timeout.ms` (6 sec) and `group.max.session.timeout.ms` (30 min), that is, the size cannot exceed this upper and lower limit. However, if the adjustment is too large, it may also cause the broker to continuously wait for the consumer client that hangs up. Personally, it is recommended to set reasonable parameters according to the usage scenario.

above~