[Development] 1. Processing functions: timers and timing services

Article directory

  • 1. Basic processing functions
  • 2. Timer and scheduled services
  • 3. Demonstration timer under KeyedProcessFunction
  • 4. Process re-obtains the current watermark

The previous API chapter is completed. Data conversion, aggregation, windows, etc. are all based on DataStream, called DataStreamAPI, as shown in the figure:

At the bottom of Flink, you don’t need to define a specific operator, but just a unified process operation, in which you can customize the logic. That is, the underlying processing function layer in the figure. From bottom to top, the packaging is getting heavier and easier to use. The maps used earlier are all encapsulated by Flink, and the bottom layer is process. When the existing operators cannot meet the requirements, just use process. It is the lowest level and the most flexible. You can just develop the logic yourself, custom processing logic! ! !

1. Basic processing functions

The use of the processing function is the same as the previous conversion operator, which can be called based on the DataStream object:

stream.process(new MyProcessFunction())
  • ProcessFunction is not an interface, but an abstract class that inherits AbstractRichFunction

  • There are two generic types of ProcessFunction: I means Input, which is the input data type; O means Output, which is the data type output after the processing is completed.

  • The ProcessFunction abstract class has abstract method processElement that must be overridden, and non-abstract method onTimer

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {<!-- -->

    ...
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {<!-- -->}
    ...

}

Abstract method processElement:

  • Define the logic for processing elements
  • This room will be called once for each element in the stream.
  • The three formal parameters are: the data value itself in the stream, the context object ctx to obtain relevant information, and the collector out to send the processed data downstream.

Non-abstract method onTimer:

  • This method is called when the timer fires
  • Registering a timer means setting an alarm clock, and onTimer is what to do after the alarm clock goes off
  • onTimer is a callback method based on the timeline
  • The three formal parameters of onTimer are: timestamp (timestamp), context (ctx), and collector (out)

The initial DataStream stream will get different types of streams after different operations, such as KeyedStream after keyBy and WindowedStream after window. For these different types of streams, you can actually call the .process() method directly for customized processing. However, process is overloaded and the parameters passed are different types of ProcessFunction

Regarding the classification of processing functions:

  • Under what circumstances the process method is called, what type of ProcessFunction is passed in?
  • For the specific type, just press Ctrl + P under process to view the parameter transfer prompt. For example, DataStream transfers ProcessFunction, and after key partitioning, KeyedStream is transferred to KeyedProcessFunction.

2. Timer and timing service

The context object Context of ProcessFunction has the timerService() method, which can return a TimerService object. TimerService is the key to Flink’s implementation of timing functions. Its common methods:

  • Get the current processing time
long currentProcessingTime();
  • Get the current watermark (event time)
long currentWatermark();
  • Register a processing time timer, which will be triggered when the processing time exceeds time
void registerProcessingTimeTimer(long time);
  • Register the event time timer, which will be triggered when the water level exceeds time
void registerEventTimeTimer(long time);
  • Delete the processing time timer whose trigger time is time
void deleteProcessingTimeTimer(long time);
  • Delete the processing time timer whose trigger time is time
void deleteEventTimeTimer(long time);

Notice:

  • Only KeyedStream supports the operation of using TimerService to set the timer.
  • TimerService will use key and timestamp as standards to deduplicate timers. That is, with the same key and timestamp, only one timer will be left, and onTimer will only be called once when triggered.

3. Demonstration timer under KeyedProcessFunction

Timer demonstration under event time: define a 5s timer, which is triggered when the water level reaches 5s.

public class KeyedProcessTimerDemo {<!-- -->

    public static void main(String[] args) throws Exception {<!-- -->
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //Out-of-order default water level generator
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) //Timestamp extraction
                );


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {<!-- -->
                    /**
                     * Call once for a piece of data
                     * @param value each piece of data
                     * @param ctx context object
                     * @param out collector
                     * @throwsException
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {<!-- -->
                        //Get the key of the current data
                        String currentKey = ctx.getCurrentKey();
                        // Get the timer service object
                        TimerService timerService = ctx.timerService();
                        //The event time extracted from the data
                        Long currentEventTime = ctx.timestamp();
                        //Register a scheduled task, triggered when the water level is pushed to 5s
                        timerService.registerEventTimeTimer(5000L);
                        System.out.println("Current key=" + currentKey + ", current time=" + currentEventTime + ", registered a 5s timer");

                        
                    /**
                     * When time progresses to the time registered by the timer, call this method
                     * @param timestamp The current time progress is the time when the timer is triggered
                     * @param ctx context
                     * @param out collector
                     * @throwsException
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {<!-- -->
                        super.onTimer(timestamp, ctx, out);
                        String currentKey = ctx.getCurrentKey();
                        System.out.println("key=" + currentKey + "The current time is" + timestamp + "Timer triggers");
                    }
                }
        );

        process.print();

        env.execute();
    }
}

</code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack.png" alt ="" title="">

Run: Note that when the timestamp is 8s, the water level is 8s-3s-1ms < 5s, which is the current maximum event time - waiting delay time - 1ms, so it is not triggered, and the same key and the same timing time , only one timer takes effect:

Take a look at the effects of different keys. Note that the water level has nothing to do with the key. When s1, 9, and 9 enter, the water level directly becomes 9-3-1ms > 5s, and all three timers are triggered.

Reuse timer under processing time:

public class KeyedProcessTimerDemo {<!-- -->

    public static void main(String[] args) throws Exception {<!-- -->
\t\t
//...The repeated code is omitted, the same as above
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {<!-- -->
                    
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {<!-- -->
                        //Get the key of the current data
                        String currentKey = ctx.getCurrentKey();
                        
                        TimerService timerService = ctx.timerService();

//Processing time of current data
                        long currentTs = timerService.currentProcessingTime();
//The timer does not use the water level as a benchmark, and directly adds 5 seconds to the processing time.
                        timerService.registerProcessingTimeTimer(currentTs + 5000L);
                        
                        System.out.println("Current key=" + currentKey + ", current time=" + currentTs + ", registered a timer after 5s");

                    }


         //...The repeated code is omitted, the same as above
}

</code><img class="look-more-preCode contentImg-no-view" src="//i2.wp.com/csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreBlack.png" alt ="" title="">

run:

4. Process re-obtains the current watermark

Still use the socket stream above, but the process logic does not use the timer. Verify the watermark:

//...The repeated code is omitted, same as above

@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {<!-- -->
     // Get the current watermark of the process
     long currentWatermark = timerService.currentWatermark();
     System.out.println("current data=" + value + ", current watermark=" + currentWatermark);
 }


At this time, you can see that when s1,1,1 enters, the water level should be 1000ms-3000ms-1ms = -2001, but what is obtained through timerService is the starting value, which is Long.MIN, until s1,5 enters , only then did we get -2001. Going down in order, there is one difference.

When the process re-obtains the current watermark, the last watermark is displayed because the process has not yet received the new watermark corresponding to this data. Key point: Watermark is also a piece of data, and it must flow downstream with the corresponding data in the stream.


The above figure shows why the watermark obtained by s5,5 is -2001, because at this time, the process has not received the new watermark corresponding to this data (1999 is still outside the process box, and there is only one -2001 in the box)