Flink’s state programming

Flink’s state programming

  • 1. Keyed State
    • 1.1, value state (ValueState)
      • 1.1.1. Definition
      • 1.1.2. Use cases
    • 1.2, list state (ListState)
      • 1.2.1. Definition
      • 1.2.2. Use cases
    • 1.3. Mapping state (MapState)
      • 1.3.1. Definition
      • 1.3.2. Use cases
    • 1.4. Reducing State (ReducingState)
      • 1.4.1. Definition
      • 1.4.2. Use cases
    • 1.5, Aggregating State (AggregatingState)
      • 1.5.1. Definition
      • 1.5.2. Use cases
  • Two, broadcast state (BroadcastState)
    • 2.1. Definition
    • 2.2. Case

1. Keyed State

1.1, ValueState

1.1.1, Definition

Only one “value” is stored in the state. ValueState itself is an interface, defined in the source code as follows:

public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}

1.1.2, use cases

Use ValueState and timer to output the user’s pv amount every 10 seconds

package com.hpsk.flink.state;

import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;

public class ValueStateDS {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env. setParallelism(1);

        DataStreamSource<Event> stream = env. addSource(new EventWithWatermarkSource());

        SingleOutputStreamOperator<String> result = stream.keyBy(t -> t.user)
                .process(new KeyedProcessFunction<String, Event, String>() {<!-- -->
                    // Define two states, save the current pv value, and timer timestamp
                    private ValueState<Long> valueState;
                    private ValueState<Long> timerTsState;

                    @Override
                    public void open(Configuration parameters) throws Exception {<!-- -->
                        valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state", Long.class));
                        timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));

                    }

                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> collector) throws Exception {<!-- -->
                        Long count = valueState. value();
                        if (count == null) {<!-- -->
                            valueState. update(1L);
                        } else {<!-- -->
                            valueState. update(count + 1);
                        }

                        // register timer
                        if (timerTsState. value() == null) {<!-- -->
                            ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
                            timerTsState.update(value.timestamp + 10 * 1000L);
                        }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {<!-- -->
                        out.collect("Time: " + new Timestamp(timestamp) + " -> User: " + ctx.getCurrentKey() + " pv value: " + valueState.value());
                        timerTsState. clear();
                    }
                });
        result. print(">>>>");

        env. execute();
    }
}

1.2, list state (ListState)

1.2.1, Definition

Organize the data to be saved in the form of List. There is also a type parameter T in the ListState interface, indicating the type of data in the list. ListState also provides a series of methods to manipulate the state, using
Very similar to a normal List.

1.2.2, use cases

Use ListState to implement the join operation in sql

package com.hpsk.flink.state;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

public class ListStateDS {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env. setParallelism(1);
        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env. fromElements(
                Tuple3.of("a", "stream-1", 1000L),
                Tuple3.of("b", "stream-1", 2000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>for MonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {<!-- -->
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {<!-- -->
                return stringStringLongTuple3.f2;
            }
        }));

        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env. fromElements(
                Tuple3.of("a", "stream-2", 3000L),
                Tuple3.of("b", "stream-2", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>for MonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {<!-- -->
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {<!-- -->
                return stringStringLongTuple3.f2;
            }
        }));

        stream1.keyBy(r -> r.f0)
                .connect(stream2.keyBy(r -> r.f0))
                .process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {<!-- -->
                    private ListState<Tuple3<String, String, Long>> listState1;
                    private ListState<Tuple3<String, String, Long>> listState2;

                    @Override
                    public void open(Configuration parameters) throws Exception {<!-- -->
                        listState1 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState1", Types. TUPLE(Types. STRING, Types. STRING, Types. LONG)));
                        listState2 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState2", Types. TUPLE(Types. STRING, Types. STRING, Types. LONG)));

                    }

                    @Override
                    public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> out) throws Exception {<!-- -->
                        listState1. add(left);
                        for (Tuple3<String, String, Long> right : listState2.get()) {<!-- -->
                            out. collect(left + " => " + right);
                        }
                    }

                    @Override
                    public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> out) throws Exception {<!-- -->
                        listState2. add(right);
                        for (Tuple3<String, String, Long> left : listState1.get()) {<!-- -->
                            out. collect(left + " => " + right);
                        }
                    }
                })
                .print();
        env. execute();
    }
}

1.3, Mapping State (MapState)

1.3.1, Definition

The mapping state is similar to the HashMap in Java, which can record a set of KV values, and fetch or update the Value according to the corresponding Key.

1.3.2, use cases

Simulate a scrolling window using KeyedProcessFunction

package com.hpsk.flink.state;

import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;

// Simulate a scrolling window using KeyedProcessFunction
public class MapStateDS {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env. setParallelism(1);

        DataStreamSource<Event> inputDS = env. addSource(new EventWithWatermarkSource());

        SingleOutputStreamOperator<String> result = inputDS
                .keyBy(t -> t.url)
                .process(new FakeWindowResult(10000L));
        result. print(">>>");

        env. execute();
    }

    private static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{<!-- -->
        // Define properties, window length
        private Long windowSize;
        // Declare state, use map to save pv value (window start, count)
        private MapState<Long, Long> windowPvMapState;
        public FakeWindowResult(Long windowSize) {<!-- -->
            this. windowSize = windowSize;
        }

        @Override
        public void open(Configuration parameters) throws Exception {<!-- -->
            windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Types.LONG, Types.LONG));
        }

        @Override
        public void processElement(Event event, Context ctx, Collector<String> collector) throws Exception {<!-- -->
            // Each time a piece of data comes, judge which window it belongs to according to the timestamp
            long windowStart = event.timestamp / windowSize * windowSize;
            long windowEnd = windowStart + windowSize;
            // Register the timer of end -1, the window triggers the calculation
            ctx.timerService().registerEventTimeTimer(windowEnd - 1);
            // Update the pv value in the state
            if (windowPvMapState. contains(windowStart)) {<!-- -->
                Long pv = windowPvMapState.get(windowStart);
                windowPvMapState.put(windowStart, pv + 1);
            } else {<!-- -->
                windowPvMapState.put(windowStart, 1L);
            }
        }

        // Timer triggers, directly output statistical pv results
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {<!-- -->
            long windowEnd = timestamp + 1;
            long windowStart = windowEnd - windowSize;
            long pv = windowPvMapState.get(windowStart);
            out.collect("url:[" + ctx.getCurrentKey()
                         + "]'s traffic is " + pv
                         + ", the corresponding window is: [" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd) + "]");
            // Simulate the destruction of the window, clear the key in the map
            windowPvMapState. remove(windowStart);
        }
    }
}

1.4, Reducing State (ReducingState)

1.4.1, Definition

Calculate the current data state and the previous data state, and use the built-in reduce for statistical reduction

1.4.2, use cases

Calculate the average timestamp for every 5 data of user click event stream

package com.hpsk.flink.state;

import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

public class ReducingStateDS {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env. setParallelism(1);

        DataStreamSource<Event> inputDS = env. addSource(new EventWithWatermarkSource());
        inputDS. print("input");

        SingleOutputStreamOperator<String> result = inputDS
                .keyBy(t -> t.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {<!-- -->
                    // Define the ReducingState calculation and store the result
                    private ReducingState<Tuple2<Event, Long>> reducingState;
                    // Define a value state to save the current user access frequency
                    private ValueState<Long> countState;
                    @Override
                    public void open(Configuration parameters) throws Exception {<!-- -->
                        // initialization
                        countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));
                        reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Tuple2<Event, Long>>(
                                "reducing-state",
                                new ReduceFunction<Tuple2<Event, Long>>() {<!-- -->
                                    @Override
                                    public Tuple2<Event, Long> reduce(Tuple2<Event, Long> t1, Tuple2<Event, Long> t2) throws Exception {<!-- -->
                                        // Accumulate access time
                                        return Tuple2.of(t1.f0, t2.f1 + t1.f1);
                                    }
                                },
                                Types.TUPLE(Types.LONG, Types.LONG)
                        ));
                    }

                    @Override
                    public void flatMap(Event event, Collector<String> collector) throws Exception {<!-- -->
                        Long count = countState. value();
                        if (count == null) {<!-- -->
                            count = 1L;
                        } else {<!-- -->
                            count + + ;
                        }
                        // update access count
                        countState. update(count);
                        // accumulatively calculate the access time
                        reducingState.add(Tuple2.of(event, event.timestamp));

                        if (count == 5) {<!-- -->
                            // Visit 5 times, output the average access time
                            collector.collect(event.user + " " + new Timestamp(reducingState.get().f1 / count));
                            // clear state
                            countState. clear();
                            reducingState. clear();
                        }
                    }
                });

        result. print("output ");

        env. execute();
    }
}

1.5, AggregatingState

1.5.1, Definition

Represent state as a list for aggregate operations

1.5.2, use cases

Calculate the average timestamp for every 5 data of user click event stream

package com.hpsk.flink.state;

import com.hpsk.flink.beans.Event;
import com.hpsk.flink.source.EventWithWatermarkSource;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

public class AggregatingStateDS {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env. setParallelism(1);

        DataStreamSource<Event> inputDS = env. addSource(new EventWithWatermarkSource());
        inputDS. print("input");
        SingleOutputStreamOperator<String> result = inputDS
                .keyBy(t -> t.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {<!-- -->
            // Define the aggregation state used to calculate the average timestamp
            private AggregatingState<Event, Long> aggregatingState;

            // Define a value state to save the current user access frequency
            private ValueState<Long> countState;

            @Override
            public void open(Configuration parameters) throws Exception {<!-- -->
                countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));
                aggregatingState = getRuntimeContext().getAggregatingState(
                        new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>(
                                "avg-ts",
                                new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {<!-- -->
                                    @Override
                                    public Tuple2<Long, Long> createAccumulator() {<!-- -->
                                        return Tuple2.of(0L, 0L);
                                    }

                                    @Override
                                    public Tuple2<Long, Long> add(Event event, Tuple2<Long, Long> accumulator) {<!-- -->
                                        return Tuple2.of(accumulator.f0 + event.timestamp, accumulator.f1 + 1);
                                    }

                                    @Override
                                    public Long getResult(Tuple2<Long, Long> accumulator) {<!-- -->
                                        return accumulator.f0 / accumulator.f1;
                                    }

                                    @Override
                                    public Tuple2<Long, Long> merge(Tuple2<Long, Long> longLongTuple2, Tuple2<Long, Long> acc1) {<!-- -->
                                        return null;
                                    }
                                },
                                Types. TUPLE(Types. LONG, Types. LONG)));
            }

            @Override
            public void flatMap(Event event, Collector<String> out) throws Exception {<!-- -->
                Long count = countState. value();
                if (count == null) {<!-- -->
                    count = 1L;
                } else {<!-- -->
                    count + + ;
                }
                countState. update(count);
                aggregatingState.add(event);


                if (count == 5) {<!-- -->
                    out.collect((event.user + " Average Timestamp: " + new Timestamp(aggregatingState.get())));
                    countState. clear();
                }
            }
        });
        result. print("output ");

        env. execute();
    }
}

2. Broadcast State (BroadcastState)

2.1, Definition

Sometimes we want the operator parallel subtasks to maintain the same “global” state for unified configuration and rule setting. At this time, all data in all partitions will access the same state, as if the state is “broadcast” to all partitions. This special operator state is called broadcast state (BroadcastState).

2.2, case

Simulate the creation of dimension tables in the dynamic configuration real-time data warehouse

package com.hpsk.flink.function;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;


// Simulate the creation of dimension tables in the dynamic configuration real-time data warehouse
public class BroadcastProcessFunctionDS {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        // 1. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env. setParallelism(1);

        // 2. Configuration flow: configuration table of dimension table
        SingleOutputStreamOperator<String> tableConfigStream = env.fromElements(
                "table1,createTable",
                "table2,createTable",
                "table3,createTable");

        // 3. Mainstream: real-time data flow of business library
        SingleOutputStreamOperator<Tuple2<String, String>> MySqlTableStream = env.socketTextStream("hadoop102", 8888)
                .map(line -> Tuple2.of(line.split(",")[0], line.split(",")[1]))
                .returns(Types. TUPLE(Types. STRING, Types. STRING));
        // Process the configuration stream into a broadcast stream
        MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, String.class);
        BroadcastStream<String> broadcast = tableConfigStream.broadcast(mapStateDescriptor);
        // Connect the mainstream and the broadcast stream into a connection stream, process the connection stream, and process the mainstream data according to the configuration information
        BroadcastConnectedStream<Tuple2<String, String>, String> connectedStream = MySqlTableStream.connect(broadcast);

        SingleOutputStreamOperator<String> result = connectedStream. process(new MyBroadcastProcessFunction(mapStateDescriptor));

        // 5. Output the result
        result. print("output ");
        // 6. Execute
        env. execute();
    }

    public static class MyBroadcastProcessFunction extends BroadcastProcessFunction<Tuple2<String, String>, String, String>{<!-- -->
        private MapStateDescriptor<String, String> mapStateDescriptor;

        public MyBroadcastProcessFunction(MapStateDescriptor<String, String> mapStateDescriptor) {<!-- -->
            this. mapStateDescriptor = mapStateDescriptor;
        }

        @Override
        public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {<!-- -->
            BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
            String[] split = value. split(",");
            broadcastState.put(split[0].trim(), split[1].trim());
        }

        @Override
        public void processElement(Tuple2<String, String> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {<!-- -->
            ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
            String table = value.f0;
            String create = broadcastState. get(table);
            if (create != null) {<!-- -->
                out.collect(value.f0 + " is the configuration table, you need to create a table in phoenix -> table creation statement: " + create + ", the data is: " + value.f1);
            } else {<!-- -->
                out.collect(value.f0 + "business table, skip table creation");
            }
        }
    }
}