In-depth analysis of Flink: source code interpretation, data tilt code implementation

Hello everyone, I am the Great Sage, and I am very happy to meet you again.

In the last article, we discussed in detail how to solve data skew by making each parallel subtask of Flink have a corresponding key.

However, we have only talked about the idea and design understanding of this solution, and have not actually applied this solution to our Flink tasks.

In this article, we focus on implementing this solution into the Flink tasks we wrote.

What is data skew

Solution Review

code show as below:

public class RebalanceKeyCreator {

    private int defaultParallelism;

   private int parallelism;

    public RebalanceKeyCreator(int parallelism) {
        this.parallelism = parallelism;
    }

    public Integer[] generateRebalanceKeys() {
        int maxParallel = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
        int maxKey = parallelism * 12;

        Map<Integer, Integer> subIndexKeyMap = new HashMap<>();

        for (int key = 0; key < maxKey; key + + ) {
            int subtaskIdx = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallel, parallelism);
            if (!subIndexKeyMap.containsKey(subtaskIdx)) {
                subIndexKeyMap.put(subtaskIdx, key);
            }
        }

        return subIndexKeyMap.values().toArray(new Integer[0]);
    }

}

The design concept of this code is to ensure that each degree of parallelism is assigned a unique key to ensure balanced key distribution, thereby achieving balanced data distribution.

Code Practice

I will give you a Flink task that does not use the optimization of our code above so that you can see the data skew.

Then let’s take a look at a Flink task that has been optimized by our above solution so that everyone can see the effect.

Examples of data skew
import com.atguigu.func.MyProcessFunction;
import com.atguigu.source.DataGeneratorSource2;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SkewedJob {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 8082);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);


        // Define the maximum user ID and number of events for the data source
        int maxUserId = 100; // Assume we have 100 different user IDs
        env.setParallelism(10);
        //Use custom data source
        DataGeneratorSource2 dataSource = new DataGeneratorSource2(maxUserId);

        //Add the data source to the environment and define subsequent transformation operations
        SingleOutputStreamOperator<Tuple2<Integer, String>> skewedInput = env.addSource(dataSource)
                .name("Custom Data Source")
                .setParallelism(1);//Set the parallelism of the data source


        skewedInput
                .map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {

                        return value;
                    }
                })
                .keyBy(event -> event.f0)
                .process(new MyProcessFunction());


        env.execute("Skewed Job");
    }
}

The explanation of this code is as follows: First load the data source, and then directly return the data in the map operator. Here I use a custom data source method:

package com.atguigu.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.Random;

public class DataGeneratorSource2 implements SourceFunction<Tuple2<Integer, String>> {

    private volatile boolean isRunning = true;
    private final int maxUserId;

    private final Random random = new Random();

    public DataGeneratorSource2(int maxUserId) {
        this.maxUserId = maxUserId;

    }

    @Override
    public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {

        while (isRunning) {
            //Generate data for a random user
            int userId = random.nextInt(maxUserId) + 1;
            String action = random.nextBoolean() ? "click" : "view";
            ctx.collect(new Tuple2<>(userId, action));
            Thread.sleep(10); // Sleep to simulate time gap between events

        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

In the MyProcessFunction class, the data is sent downstream. The code is as follows:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class MyProcessFunction extends KeyedProcessFunction<Integer, Tuple2<Integer, String>, Integer> {

    @Override
    public void processElement(
            Tuple2<Integer, String> value,
            KeyedProcessFunction<Integer, Tuple2<Integer, String>, Integer>.Context ctx,
            Collector<Integer> out) throws Exception {

        //implement your logic
        // For example, you can collect the first field directly:
        out.collect(value.f0);

        // You can also use the Context parameter to register a timer or access keying status, etc.
        // ctx.timerService().registerEventTimeTimer(...);
    }
}

The final running result is as follows:

Optimization of data skew

code show as below:

import com.atguigu.source.DataGeneratorSource2;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class RebalancedJob {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 8083);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);


        int parallelism = 10;
        Integer[] rebalanceKeys = RebalanceKeyCreator.generateRebalanceKeys(parallelism);

        int maxUserId = 100; // Assume we have 100 different user IDs
        env.setParallelism(10);


        //Use custom data source
        DataGeneratorSource2 dataSource = new DataGeneratorSource2(maxUserId);

        //Add the data source to the environment and define subsequent transformation operations
        SingleOutputStreamOperator<Tuple2<Integer, String>> skewedInput = env.addSource(dataSource)
                .name("Custom Data Source")
                .setParallelism(1);//Set the parallelism of the data source


        skewedInput
                .map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {
                        int newKey = rebalanceKeys[value.f0 % parallelism];
                        return new Tuple2<>(newKey, value.f1);
                    }
                })
                .keyBy(event -> event.f0)
                .process(new KeyedProcessFunction<Integer, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
                    @Override
                    public void processElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple2<Integer, String>> out) throws Exception {
                        // Where to process each element
                        out.collect(value);
                    }
                });


        env.execute("Rebalanced Job");
    }


}

Code explanation: This optimized code is almost the same as the code in the above example with data skew.

Just add the code in the optimization idea mentioned in our previous article, as follows:

public class RebalanceKeyCreator {
    public static Integer[] generateRebalanceKeys(int parallelism) {
        int maxParallel = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
        int maxKey = parallelism * 12;

        Map<Integer, Integer> subIndexKeyMap = new HashMap<>();

        for (int key = 0; key < maxKey; key + + ) {
            int subtaskIdx = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallel, parallelism);
            if (!subIndexKeyMap.containsKey(subtaskIdx)) {
                subIndexKeyMap.put(subtaskIdx, key);
            }
        }

        return subIndexKeyMap.values().toArray(new Integer[0]);
    }

}

Then call it in the Flink task, as follows:

 Integer[] rebalanceKeys = RebalanceKeyCreator.generateRebalanceKeys(parallelism);
 int newKey = rebalanceKeys[value.f0 % parallelism];

The optimized effect is as follows:

Code explanation

In fact, when optimizing, the idea is to first pass the parallelism of the current program into generateRebalanceKeys() in the Flink task, and then make each parallelism have a corresponding and unique key, and then return the hashmap.

Then in the map operator, use int newKey = rebalanceKeys[value.f0 % parallelism]; to get the new newkey. This newkey can be used for keyby later.

value.f0 % parallelism The value obtained is [0, parallelism – 1], which is each degree of parallelism in our Flink.

Finally, why this can solve the problem of data skew, the design principles and ideas of the code have been explained in detail in the previous article. If you are not sure, you can read it in the previous article.

Usage Scenario

Points needing attention
Attention Point 1

If an optimized solution is used and keystate is used after the keyby operator, multiple original keys may be merged into one newkey. See the following example:

UserID: 1 -> Rebalance Key: 100

UserID: 2 -> Rebalance Key: 100

UserID: 3 -> Rebalance Key: 101

UserID: 4 -> Rebalance Key: 101

In this case, when we use Rebalance Key for keying operations,

The status of UserID 1 and 2 will be merged into the status of Rebalance Key 100, and the status of UserID 3 and 4 will be merged into the status of Rebalance Key 101.

Solution 1: Store user information in the state

You can use MapState, where the “original key” is the user ID and the state is the user-specific information you need to track. When handling an event, you access state based on the new key of the map, and then use the original key in that state to update the correct user data.

MapState<Integer, UserBehaviorState> state; // Assume that UserBehaviorState is some kind of data structure containing user behavior information

Each time an event is handled:

// Assume value is an input event and newKey is the new key obtained through Rebalance Key
UserBehaviorState userState = state.get(value.f0); // Use the original key (user ID) to get the state
if (userState == null) {
    userState = new UserBehaviorState(); // If a state has not been created for this user ID, create a new one
}
// update status
// ...
state.put(value.f0, userState); // Save the updated state
Attention Point 2

When we expand or reduce capacity, I think some problems may arise. I studied it all afternoon and couldn’t figure it out.

So I haven’t thought through this part of my logic yet, so I can’t give a clear answer for the time being.

I will thoroughly sort out this logic in the next time, and then write a separate article to explain it to everyone.

Attention Point 3

I think this solution is not advisable when the data of a key is very large, because no matter how you map it, the key will fall on the same degree of parallelism.

So I think this is something that this solution cannot solve.

This situation is generally solved using a two-stage aggregation approach.

Personal understanding

I personally think that the above optimized solution is suitable for multiple keys, and there may be a parallelism that is not mapped to the key. This kind of data skew is more suitable for our above solution.

Just like the example I gave above, you can clearly see that in the optimized solution, the data processed by ten degrees of parallelism is very even.

But in the unoptimized plan, there is a tilt.

Easter Egg

Let me share with you a little trick to run Flink tasks directly in the local IDE, and we can also open the Flink UI interface.

Add dependency

In addition to the core dependencies of Flink, we also add the following dependency

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
</dependency>
Create environment
 Configuration conf = new Configuration();
 conf.setInteger("rest.port", 8082);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

After completing the above two steps, after starting the Flink task, enter localhost:8082 in the browser to see the Flink UI interface.

Note that the 8082 port is not unique, you can also change your own port.

Summary

This article puts our optimization plan into practice and allows everyone to see the effect of optimization.

However, there are some points that I still haven’t explained clearly. I will thoroughly understand this knowledge point in the future, and then write a separate article for everyone. I hope you can forgive me.

Everyone wants the above code to be tested locally.

Also, there must be something unclear about what I said above. Everyone is welcome to discuss it together. You can follow the WeChat public account: Dasheng Data Planet to join the group discussion.

This article is published by OpenWrite, a blog posting platform!