AggregateFunction combines with custom triggers to implement click-through rate calculation

Background:

Continuing from the previous article, ProcessWindowFunction combined with a custom trigger will cause the problem of too large a state. This article uses AggregateFunction combined with a custom trigger to implement it, so that it will not cause the problem of too large a state.

AggregateFunction combined with custom triggers


Flink only needs to maintain one state for each window: unlike ProcessWindowFunction, which needs to store all messages received in the window as states


For the complete code, see:

package wikiedits.func;


import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;



public class AggregateFunctionAndTiggerDemo {<!-- -->

    public static void main(String[] args) throws Exception {<!-- -->

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Use processing time
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/aggregatetrigger"));

        // Parallelism is 1
        env.setParallelism(1);
        //Set the data source, a total of three elements
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {<!-- -->
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {<!-- -->
                int xxxNum = 0;
                int yyyNum = 0;
                for (int i = 1; i < Integer.MAX_VALUE; i + + ) {<!-- -->
                    // There are only two names: XXX and YYY
                    String name = (0 == i % 2) ? "XXX" : "YYY";
                    // Update the total number of aaa and bbb elements
                    if (0 == i % 2) {<!-- -->
                        xxxNum + + ;
                    } else {<!-- -->
                        yyyNum + + ;
                    }
                    // Use current time as timestamp
                    long timeStamp = System.currentTimeMillis();
                    //Print the data and timestamp to verify the data
                    if (xxxNum % 2000 == 0) {<!-- -->
                        System.out.println(String.format("source,%s, %s, XXX total : %d, YYY total : %d\
", name,
                                time(timeStamp), xxxNum, yyyNum));
                    }
                    // Emit an element and put a timestamp on it
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
                    //Delay 1 second for each launch
                    Thread.sleep(1);
                }
            }

            @Override
            public void cancel() {<!-- -->}
        });

        // Divide the data using a rolling window of 5 seconds, and then use ProcessWindowFunction
        SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = dataStream
                // Use the f0 field of Tuple2 as the key. In this example, there are actually only two keys: aaa and bbb.
                .keyBy(value -> value.f0)
                // Scroll window every 5 seconds
                .timeWindow(Time.minutes(5))
                // Trigger a calculation every 10 seconds and update the statistical results
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                // Count the number of elements in the current window for each key, and then organize the key, quantity, and window start and end time into a string and send it to the downstream operator
                .aggregate(
                        new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {<!-- -->

                            // 1. Initial value
                            //Define the initial value of the accumulator
                            @Override
                            public Tuple2<String, Integer> createAccumulator() {<!-- -->
                                return new Tuple2<String, Integer>("", 0);
                            }

                            // 2. Accumulation
                            // Define how the accumulator accumulates based on input data
                            @Override
                            public Tuple2<String, Integer> add(Tuple2<String, Integer> value,
                                    Tuple2<String, Integer> accumulator) {<!-- -->
                                accumulator.f0 = value.f0;
                                accumulator.f1 + = value.f1;
                                return accumulator;
                            }

                            // 3. Merge
                            // Define how the accumulator is merged with the accumulator in State
                            @Override
                            public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc1,
                                    Tuple2<String, Integer> acc2) {<!-- -->
                                acc1.f1 + = acc2.f1;
                                return acc1;
                            }

                            // 4. Output
                            // Define how to output data
                            @Override
                            public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {<!-- -->
                                return accumulator;
                            }
                        });

        //Print results, by analyzing the print information, check the data of the entire window in ProcessWindowFunction that can handle all keys
        mainDataStream.print();

        env.execute("processfunction demo : processwindowfunction");

    }



    public static String time(long timeStamp) {<!-- -->
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }



}

In this way, we can count the click-through rate of a certain page in a day so far, and output the click-through rate results every 10 seconds without causing the problem of status expansion

references:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html