Flink — Event Time & Watermark

1. Event time:

It refers to the time when the data is generated or the time when the data occurs.

There are three types of time in Flink:

Event Time: Event time, the time when the data is generated, can reflect the time when the data actually occurs.

Infestion Time: event reception time

Processing Time: event processing time

Why is the concept of event time proposed?

Because when using Processing Time (event processing time) to process data, the data may be out of order and there is no way to restore the time order of the data itself. This situation may cause data loss in Flink. If event time is used , it will sort the data according to the actual time of the event, so that the data will not be out of order.

In summary, the time when data is generated is the event time, and the real-time time in reality is the event processing time.

2. Processing Time event processing time

Processing time is the time it takes to operate on the data after receiving it. The processing time will be triggered according to the real-time time.

public class Demo03ProcessingTime {
    public static void main(String[] args) throws Exception{
        /**
         * Data processing time: Generally used in combination with windows, the general value is the time to operate on the data after receiving the data.
         * Requirement: count the number of words within 15 seconds every 5 seconds
         */

        //Build the Flink environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Use socket to simulate real-time operations
        DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);
        //Convert the received data into kv format
        SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));
        //Group by words
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
                .keyBy(key -> key.f0);
        //Divide the window, the window size is 10 seconds, and the sliding time is 5 seconds
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .
                window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
        //Sum the statistical words
        SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);

        countDS.print();
        //Start Flink
        env.execute();

    }
}
3. Event time:

The time when the data is generated is the event time, but when used, the timestamp is used. It should be noted that the time of the data is inconsistent with the real time.

When using event time, you need to pay attention to the fact that the data time of the entered data needs to be entered in the order of time, otherwise the data will be lost (you can also enter it not in order, there will be a solution later)

java,1699035731000
java,1699035732000
java,1699035735000
java,1699035733000
java,1699035736000
java,1699035737000
java,1699035740000


For example, the above: The data consists of two parts. The first part is the word, and the second part is the timestamp of the word data.
public class Demo04EventTime {
    public static void main(String[] args) throws Exception{
        /**
         * Requirement: Count the number of words within 5 seconds, using the event time rolling window
         * The trigger condition is event time 5 seconds
         */
        //Build the flink environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Requires parallelism to be changed to one
        env.setParallelism(1);
        //Use socket to simulate real-time environment
        DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);
        /**
         *java,1699035731000
         *java,1699035732000
         *java,1699035735000
         *java,1699035733000
         *java,1699035736000
         *java,1699035737000
         *java,1699035740000
         */
        //The format of the data at this time is not a certain word. You need to tell flink which one is the event time.
        //First format the data
        SingleOutputStreamOperator<Tuple2<String, Long>> kvDS = lineDS.map(line -> {
            String[] split = line.split(",");
            String word = split[0];
// String time = split[1];
            long time1 = Long.parseLong(split[1]);
            return Tuple2.of(word, time1);
        }, Types.TUPLE(Types.STRING, Types.LONG));

        //Tell Flink which one is the time of the event
        SingleOutputStreamOperator<Tuple2<String, Long>> assDS = kvDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        //Specify event time
                .withTimestampAssigner((kv, ts) -> kv.f1)

        );

        //Count the number of words in 5 seconds
        DataStream<Tuple2<String, Integer>>keyByDS = assDS
                .map(kv -> Tuple2.of(kv.f0, 1),Types.TUPLE(Types.STRING,Types.INT));
        //Group by words
        KeyedStream<Tuple2<String, Integer>, String> keyByDS1= keyByDS.keyBy(kv -> kv.f0);
        //open window
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS1.window(TumblingEventTimeWindows.of(Time.seconds(5)));

        //Count the number of words
        SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);
        //Print data
        countDS.print();
        //Environment for executing Flink
        env.execute();

    }
}
1. Based on event time, the conditions for triggering the window:
 1. The water level needs to be greater than or equal to the end time of the window

    2. Data must exist in the window

    3. The window division time starts from 0:00:00 on January 1, 1970, and rotates according to the size of the window
4. Watermark: The default is equal to the timestamp of the latest piece of data

5. When using event time, you need to pay attention to the fact that the data time of the entered data needs to be entered in the order of time, otherwise the data will be lost (you can also enter it not in order, there will be a solution later)< /h6>

Solution: Move the water level back

Suppose a time window is 5 seconds. If the water level at this time is moved back 5 seconds, assuming that 4 enters, the water level at this time becomes -3, but the conditions for the trigger window are not met at this time. When assuming that the missing data is 3, the water level at this time is still less than the window time, and the window will still not be triggered.

However, there is no complete guarantee that data will not be lost. The longer time passes, the greater the delay for Flink will be.

1. In Flink, the monotonic increasing timestamp allocator is used by default: when there is no disorder, the default water level is equal to the timestamp of the latest piece of data
 //1. You need to tell flink which field is the time field
        //Set the time field and water level
        DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        //1. The specified water level is equal to the timestamp of the latest piece of data. It is used when the data is not out of order. If the data is out of order, data may be lost.
                        .<Tuple2<String, Long>>forMonotonousTimestamps()

                        //Specify time field
                        .withTimestampAssigner((kv, ts) -> kv.f1)
        );
2. Timestamp allocator with a maximum fixed delay between data: in the case of out-of-order, the water level is shifted by a fixed time (based on the timestamp of the latest piece of data)
 //1. You need to tell flink which field is the time field
        //Set the time field and water level
        DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(
                WatermarkStrategy

                        //1. Water level generation method: subtracting 5 seconds from the timestamp of the latest data will cause the calculation to be delayed and triggered.
                        .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        //Specify time field
                        .withTimestampAssigner((kv, ts) -> kv.f1)
        );
6. Generation of water level:

The above figure shows the flow chart of Flink. There are two degrees of parallelism in the figure. Each Task carries the time of the task. In Flink, the time of the task will be passed backward. When the upstream map (1 ) When the task time is passed to the downstream window (1), the following upstream map (2) will also pass the task time to the downstream window (1) (the upstream task is parallel), and the downstream window (1) will be generated at this time Between the two task times, the time with the smallest time will be selected as the water level. Because when a large time is selected as the water level, the data for a smaller time may be lost.

1. Water level line alignment:

Because upstream tasks are executed in parallel, this means that the water levels of all upstream tasks need to be gradually moved backward.