Background:
Continuing from the previous article, ProcessWindowFunction combined with a custom trigger will cause the problem of too large a state. This article uses AggregateFunction combined with a custom trigger to implement it, so that it will not cause the problem of too large a state.
AggregateFunction combined with custom triggers
Flink only needs to maintain one state for each window: unlike ProcessWindowFunction, which needs to store all messages received in the window as states
For the complete code, see:
package wikiedits.func; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; public class AggregateFunctionAndTiggerDemo {<!-- --> public static void main(String[] args) throws Exception {<!-- --> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Use processing time env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/aggregatetrigger")); // Parallelism is 1 env.setParallelism(1); //Set the data source, a total of three elements DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {<!-- --> @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {<!-- --> int xxxNum = 0; int yyyNum = 0; for (int i = 1; i < Integer.MAX_VALUE; i + + ) {<!-- --> // There are only two names: XXX and YYY String name = (0 == i % 2) ? "XXX" : "YYY"; // Update the total number of aaa and bbb elements if (0 == i % 2) {<!-- --> xxxNum + + ; } else {<!-- --> yyyNum + + ; } // Use current time as timestamp long timeStamp = System.currentTimeMillis(); //Print the data and timestamp to verify the data if (xxxNum % 2000 == 0) {<!-- --> System.out.println(String.format("source,%s, %s, XXX total : %d, YYY total : %d\ ", name, time(timeStamp), xxxNum, yyyNum)); } // Emit an element and put a timestamp on it ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp); //Delay 1 second for each launch Thread.sleep(1); } } @Override public void cancel() {<!-- -->} }); // Divide the data using a rolling window of 5 seconds, and then use ProcessWindowFunction SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = dataStream // Use the f0 field of Tuple2 as the key. In this example, there are actually only two keys: aaa and bbb. .keyBy(value -> value.f0) // Scroll window every 5 seconds .timeWindow(Time.minutes(5)) // Trigger a calculation every 10 seconds and update the statistical results .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) // Count the number of elements in the current window for each key, and then organize the key, quantity, and window start and end time into a string and send it to the downstream operator .aggregate( new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {<!-- --> // 1. Initial value //Define the initial value of the accumulator @Override public Tuple2<String, Integer> createAccumulator() {<!-- --> return new Tuple2<String, Integer>("", 0); } // 2. Accumulation // Define how the accumulator accumulates based on input data @Override public Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {<!-- --> accumulator.f0 = value.f0; accumulator.f1 + = value.f1; return accumulator; } // 3. Merge // Define how the accumulator is merged with the accumulator in State @Override public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc1, Tuple2<String, Integer> acc2) {<!-- --> acc1.f1 + = acc2.f1; return acc1; } // 4. Output // Define how to output data @Override public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {<!-- --> return accumulator; } }); //Print results, by analyzing the print information, check the data of the entire window in ProcessWindowFunction that can handle all keys mainDataStream.print(); env.execute("processfunction demo : processwindowfunction"); } public static String time(long timeStamp) {<!-- --> return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp)); } }
In this way, we can count the click-through rate of a certain page in a day so far, and output the click-through rate results every 10 seconds without causing the problem of status expansion
references:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html