FlinkTransformation operator

After the data source reads the data, we can use various conversion operators to convert one or more DataStreams into new DataStreams.

After the data source enters, we can convert the data source into the data format we need according to a series of transformation operators, and then output it.

Basic conversion operators (map/ filter/ flatMap)

1.map

Map is a very familiar big data operation operator. It is mainly used to convert data in a data stream to form a new data stream. To put it simply, it is a “one-to-one mapping”. When one element is consumed, one element is produced.

public class TransMap {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                new WaterSensor("sensor_1", 1, 1),
                new WaterSensor("sensor_2", 2, 2)
        );

        // Method 1: Pass in the anonymous class to implement MapFunction
        stream.map(new MapFunction<WaterSensor, String>() {<!-- -->
            @Override
            public String map(WaterSensor e) throws Exception {<!-- -->
                return e.id;
            }
        }).print();

        //Method 2: Pass in the implementation class of MapFunction
        // stream.map(new UserMap()).print();
        
//Method 3: lambda expression
//stream.map((MapFunction<WaterSensor, String>) e-> e.id).print();
        env.execute();
    }

    public static class UserMap implements MapFunction<WaterSensor, String> {<!-- -->
        @Override
        public String map(WaterSensor e) throws Exception {<!-- -->
            return e.id;
        }
    }
}

In the above code, the generic type of the MapFunction implementation class is related to the input data type and output data type. When implementing the MapFunction interface, you need to specify two generic types, which are the types of input events and output events. You also need to rewrite a map() method to define the specific logic for converting one input event to another output event.

filter

The filter conversion operation, as the name suggests, is to perform a filter on the data stream, set the filter condition through a Boolean conditional expression, and judge each element in the stream. If it is true, the element will be output normally, if it is false, the element will be filtered out.

public class TransFilter {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
        );

        //Method 1: Pass in the anonymous class to implement FilterFunction
        stream.filter(new FilterFunction<WaterSensor>() {<!-- -->
            @Override
            public boolean filter(WaterSensor e) throws Exception {<!-- -->
                return e.id.equals("sensor_1");
            }
        }).print();

        //Method 2: Pass in the FilterFunction implementation class
        // stream.filter(new UserFilter()).print();
        
        //Method 3: lambda expression
        //stream.filter((FilterFunction<WaterSensor>)e->e.id.equals("sensor_1");).print();
        env.execute();
    }
    
    public static class UserFilter implements FilterFunction<WaterSensor> {<!-- -->
        @Override
        public boolean filter(WaterSensor e) throws Exception {<!-- -->
            return e.id.equals("sensor_1");
        }
    }
}

The above code will filter out the data with sensor id sensor_1 in the data stream.

flatMap

The flatMap operation, also known as flat mapping, is mainly used to split the whole data stream (usually a collection type) into individual ones. Consuming an element can produce 0 to multiple elements. flatMap can be thought of as a combination of the two-step operations of “flatten” and “map”, that is, the data is first broken up and split according to certain rules, and then the split elements are converted.

public class TransFlatmap {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
        );
// Method 1: Pass in the anonymous class to implement FlatMapFunction
stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {<!-- -->
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {<!-- -->
                if (value.id.equals("sensor_1")) {<!-- -->
out.collect(String.valueOf(value.vc));
} else if (value.id.equals("sensor_2")) {<!-- -->
out.collect(String.valueOf(value.ts));
out.collect(String.valueOf(value.vc));
}
            }
        })
        .print();
        
         //Method 2: Pass in the FlatMapFunction implementation class
        stream.flatMap(new MyFlatMap()).print();
\t\t
//Method 3: lambda expression
stream.flatMap((FlatMapFunction<String, String>) (value, out) -> {<!-- -->
                    if (value.id.equals("sensor_1")) {<!-- -->
out.collect(String.valueOf(value.vc));
} else if (value.id.equals("sensor_2")) {<!-- -->
out.collect(String.valueOf(value.ts));
out.collect(String.valueOf(value.vc));
}
                }).print();
        env.execute();
    }

    public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {<!-- -->

        @Override
        public void flatMap(WaterSensor value, Collector<String> out) throws Exception {<!-- -->

            if (value.id.equals("sensor_1")) {<!-- -->
                out.collect(String.valueOf(value.vc));
            } else if (value.id.equals("sensor_2")) {<!-- -->
                out.collect(String.valueOf(value.ts));
                out.collect(String.valueOf(value.vc));
            }
        }
    }
}

Aggregation operator (Aggregation)

keyBy

For Flink, DataStream does not have an API for direct aggregation. Because when we aggregate massive data, we must perform partitioned parallel processing in order to improve efficiency. Therefore, in Flink, to do aggregation, you need to partition first; this operation is completed through keyBy.

keyBy is an operator that must be used before aggregation. keyBy can logically divide a stream into different partitions by specifying a key. The partitions mentioned here are actually subtasks of parallel processing.

Based on different keys, the data in the stream will be distributed to different partitions; in this way, all data with the same key will be sent to the same partition.

Internally, this is achieved by calculating the hash code of the key and performing a modulo operation on the number of partitions. So if the key here is a POJO, the hashCode() method must be rewritten.

public class TransKeyBy {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
        );

        // Method 1: Use Lambda expression
        KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);

        //Method 2: Use anonymous class to implement KeySelector
        KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {<!-- -->
            @Override
            public String getKey(WaterSensor e) throws Exception {<!-- -->
                return e.id;
            }
        });

        env.execute();
    }
}

It should be noted that the result obtained by keyBy will no longer be a DataStream, but the DataStream will be converted into a KeyedStream. KeyedStream can be considered as a “partition stream” or “keyed stream”. It is a logical partition of DataStream according to key, so there are two types of generics: in addition to the element type in the current stream, the type of key also needs to be specified.

KeyedStream is a very important data structure. Only based on it can subsequent aggregation operations (such as sum, reduce) be performed.

Simple aggregation (sum/min/max/minBy/maxBy)

With KeyedStream, a data stream partitioned by key, we can perform aggregation operations based on it. Flink has built-in some of the most basic and simple aggregation APIs for us, mainly including the following:

  • sum(): performs a superposition and sum operation on the specified fields on the input stream.
  • min(): On the input stream, find the minimum value of the specified field.
  • max(): On the input stream, find the maximum value of the specified field.
  • minBy(): Similar to min(), finds the minimum value for the specified field on the input stream. The difference is that min() only calculates the minimum value of the specified field, and other fields will retain the value of the first data; while minBy() will return the entire data including the minimum value of the field.
  • maxBy(): Similar to max(), it finds the maximum value for the specified field on the input stream. The difference between the two is exactly the same as min()/minBy().

Simple aggregation operators are very convenient to use and their semantics are very clear. When these aggregation methods are called, parameters also need to be passed in; however, unlike the basic conversion operator, it is not necessary to implement a custom function, as long as the fields specified by the aggregation are specified. There are two ways to specify a field: specifying a location, and specifying a name.

public class TransAggregation {<!-- -->

    public static void main(String[] args) throws Exception {<!-- -->

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
        );

        stream.keyBy(e -> e.id).max("vc"); // Specify field name

        env.execute();
    }
}

reduce

Reduce can perform reduction processing on existing data, and perform an aggregation calculation on each newly input data and the currently reduced value.
The reduce operation also converts KeyedStream to DataStream. It does not change the element data type of the stream, so the output type is the same as the input type.

Case: Use reduce to implement the functions of max and maxBy.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
   .socketTextStream("hadoop102", 7777)
   .map(new WaterSensorMapFunction())
   .keyBy(WaterSensor::getId)
   .reduce(new ReduceFunction<WaterSensor>()
   {<!-- -->
       @Override
       public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {<!-- -->
           System.out.println("Demo7_Reduce.reduce");

           int maxVc = Math.max(value1.getVc(), value2.getVc());
           //To achieve the effect of max(vc), take the maximum value, and other fields will be based on the first one in the current group.
           //value1.setVc(maxVc);
           //Realize the effect of maxBy(vc) and take all fields with the current maximum value
           if (value1.getVc() > value2.getVc()){<!-- -->
               value1.setVc(maxVc);
               return value1;
           }else {<!-- -->
               value2.setVc(maxVc);
               return value2;
           }
       }
   })
   .print();
env.execute();
public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {<!-- -->
    @Override
    public WaterSensor map(String value) throws Exception {<!-- -->
        String[] datas = value.split(",");
        return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );
    }
}

Rich Function Classes

“Rich function class” is also a function class interface provided by the DataStream API. All Flink function classes have their Rich versions. Rich function classes generally appear in the form of abstract classes. For example: RichMapFunction, RichFilterFunction, RichReduceFunction, etc.
The main difference from regular function classes is that rich function classes can obtain the context of the running environment and have some life cycle methods, so they can implement more complex functions.
Rich Function has the concept of life cycle. Typical life cycle methods are:

  • The open() method is the initialization method of Rich Function, which starts the life cycle of an operator. When an operator’s actual working method such as map() or filter() method is called, open() will be called first.
  • The close() method is the last method called in the life cycle, similar to the end method. Generally used to do some cleaning work.

It should be noted that the life cycle method here will only be called once for a parallel subtask; and the corresponding actual working method, for example
map() in RichMapFunction triggers a call after each piece of data arrives.

public class RichFunctionExample {<!-- -->

    public static void main(String[] args) throws Exception {<!-- -->

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env
                .fromElements(1,2,3,4)
                .map(new RichMapFunction<Integer, Integer>() {<!-- -->
                    @Override
                    public void open(Configuration parameters) throws Exception {<!-- -->
                        super.open(parameters);
                        System.out.println("The index is: " + getRuntimeContext().getIndexOfThisSubtask() + " The life cycle of the task begins");
                    }

                    @Override
                    public Integer map(Integer integer) throws Exception {<!-- -->
                        return integer + 1;
                    }

                    @Override
                    public void close() throws Exception {<!-- -->
                        super.close();
                        System.out.println("The index is: " + getRuntimeContext().getIndexOfThisSubtask() + " The life cycle of the task ends");
                    }
                })
                .print();

        env.execute();
    }
}

Physical Partitioning

Common physical partition strategies include: random allocation (Random), round-robin allocation (Round-Robin), rescaling (Rescale) and broadcast (Broadcast).

Random partition (shuffle)

The simplest way to repartition is to “shuffle” directly. By calling the .shuffle() method of DataStream, the data is randomly distributed to the parallel tasks of the downstream operators.
Random partitions obey uniform distribution, so the data in the stream can be randomly disrupted and evenly delivered to downstream task partitions. Because it is completely random, for the same input data, the results obtained every time it is executed will not be the same.

public class ShuffleExample {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(2);

        DataStreamSource<Integer> stream = env.socketTextStream("hadoop102", 7777);;

        stream.shuffle().print()

        env.execute();
    }
}

Round-Robin

Polling, simply put, is “dealing cards”, distributing data in sequence. By calling the .rebalance() method of DataStream, you can implement polling repartitioning. Rebalance uses the Round-Robin load balancing algorithm, which can evenly distribute input stream data to downstream parallel tasks.

stream.rebalance().print()

Rescale partition (rescale)

Rescale partitioning is very similar to polling partitioning. When the rescale() method is called, the underlying layer actually uses the Round-Robin algorithm for polling, but only the datapolling is sent to a part of the downstream parallel tasks. The method of rescaling is to divide into small groups, and the dealeronly deals cards to everyone in his own group in turn.

stream.rescale().print()

Broadcast

In fact, this method should not be called “repartitioning”, because after broadcasting, the data will be kept in different partitions and may be processed repeatedly. The input data can be copied and sent to all parallel tasks of the downstream operator by calling the broadcast() method of DataStream.

stream.broadcast().print()

Global partition (global)

Global partitioning is also a special partitioning method. This approach is very extreme. By calling the .global() method, all input stream data will be sent to the first parallel subtask of the downstream operator. This is equivalent to forcing the parallelism of the downstream tasks to 1, so you need to be very careful when using this operation, which may put a lot of pressure on the program.

stream.global().print()

Custom partition (Custom)

When all partition strategies provided by Flink cannot meet user needs, we can customize the partition strategy by using the partitionCustom() method.

public class PartitionCustomDemo {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(2);

        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        DataStream<String> myDS = socketDS
                .partitionCustom(
                        new MyPartitioner(),
                        value -> value);
        myDS.print();

        env.execute();
    }
}

public class MyPartitioner implements Partitioner<String> {<!-- -->

    @Override
    public int partition(String key, int numPartitions) {<!-- -->
        return Integer.parseInt(key) % numPartitions;
    }
}