Flink’s state programming
- 1. Keyed State
-
- 1.1, value state (ValueState)
-
- 1.1.1. Definition
- 1.1.2. Use cases
- 1.2, list state (ListState)
-
- 1.2.1. Definition
- 1.2.2. Use cases
- 1.3. Mapping state (MapState)
-
- 1.3.1. Definition
- 1.3.2. Use cases
- 1.4. Reducing State (ReducingState)
-
- 1.4.1. Definition
- 1.4.2. Use cases
- 1.5, Aggregating State (AggregatingState)
-
- 1.5.1. Definition
- 1.5.2. Use cases
- Two, broadcast state (BroadcastState)
-
- 2.1. Definition
- 2.2. Case
1. Keyed State
1.1, ValueState
1.1.1, Definition
Only one “value” is stored in the state. ValueState itself is an interface, defined in the source code as follows:
public interface ValueState<T> extends State { T value() throws IOException; void update(T value) throws IOException; }
1.1.2, use cases
Use ValueState and timer to output the user’s pv amount every 10 seconds
package com.hpsk.flink.state; import com.hpsk.flink.beans.Event; import com.hpsk.flink.source.EventWithWatermarkSource; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.sql.Timestamp; public class ValueStateDS {<!-- --> public static void main(String[] args) throws Exception {<!-- --> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env. setParallelism(1); DataStreamSource<Event> stream = env. addSource(new EventWithWatermarkSource()); SingleOutputStreamOperator<String> result = stream.keyBy(t -> t.user) .process(new KeyedProcessFunction<String, Event, String>() {<!-- --> // Define two states, save the current pv value, and timer timestamp private ValueState<Long> valueState; private ValueState<Long> timerTsState; @Override public void open(Configuration parameters) throws Exception {<!-- --> valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("value-state", Long.class)); timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class)); } @Override public void processElement(Event value, Context ctx, Collector<String> collector) throws Exception {<!-- --> Long count = valueState. value(); if (count == null) {<!-- --> valueState. update(1L); } else {<!-- --> valueState. update(count + 1); } // register timer if (timerTsState. value() == null) {<!-- --> ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L); timerTsState.update(value.timestamp + 10 * 1000L); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {<!-- --> out.collect("Time: " + new Timestamp(timestamp) + " -> User: " + ctx.getCurrentKey() + " pv value: " + valueState.value()); timerTsState. clear(); } }); result. print(">>>>"); env. execute(); } }
1.2, list state (ListState)
1.2.1, Definition
Organize the data to be saved in the form of List. There is also a type parameter T in the ListState interface, indicating the type of data in the list. ListState also provides a series of methods to manipulate the state, using
Very similar to a normal List.
1.2.2, use cases
Use ListState to implement the join operation in sql
package com.hpsk.flink.state; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.util.Collector; public class ListStateDS {<!-- --> public static void main(String[] args) throws Exception {<!-- --> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env. setParallelism(1); SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env. fromElements( Tuple3.of("a", "stream-1", 1000L), Tuple3.of("b", "stream-1", 2000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>for MonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {<!-- --> @Override public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {<!-- --> return stringStringLongTuple3.f2; } })); SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env. fromElements( Tuple3.of("a", "stream-2", 3000L), Tuple3.of("b", "stream-2", 4000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>for MonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {<!-- --> @Override public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {<!-- --> return stringStringLongTuple3.f2; } })); stream1.keyBy(r -> r.f0) .connect(stream2.keyBy(r -> r.f0)) .process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {<!-- --> private ListState<Tuple3<String, String, Long>> listState1; private ListState<Tuple3<String, String, Long>> listState2; @Override public void open(Configuration parameters) throws Exception {<!-- --> listState1 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState1", Types. TUPLE(Types. STRING, Types. STRING, Types. LONG))); listState2 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("listState2", Types. TUPLE(Types. STRING, Types. STRING, Types. LONG))); } @Override public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> out) throws Exception {<!-- --> listState1. add(left); for (Tuple3<String, String, Long> right : listState2.get()) {<!-- --> out. collect(left + " => " + right); } } @Override public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> out) throws Exception {<!-- --> listState2. add(right); for (Tuple3<String, String, Long> left : listState1.get()) {<!-- --> out. collect(left + " => " + right); } } }) .print(); env. execute(); } }
1.3, Mapping State (MapState)
1.3.1, Definition
The mapping state is similar to the HashMap in Java, which can record a set of KV values, and fetch or update the Value according to the corresponding Key.
1.3.2, use cases
Simulate a scrolling window using KeyedProcessFunction
package com.hpsk.flink.state; import com.hpsk.flink.beans.Event; import com.hpsk.flink.source.EventWithWatermarkSource; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.sql.Timestamp; // Simulate a scrolling window using KeyedProcessFunction public class MapStateDS {<!-- --> public static void main(String[] args) throws Exception {<!-- --> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env. setParallelism(1); DataStreamSource<Event> inputDS = env. addSource(new EventWithWatermarkSource()); SingleOutputStreamOperator<String> result = inputDS .keyBy(t -> t.url) .process(new FakeWindowResult(10000L)); result. print(">>>"); env. execute(); } private static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{<!-- --> // Define properties, window length private Long windowSize; // Declare state, use map to save pv value (window start, count) private MapState<Long, Long> windowPvMapState; public FakeWindowResult(Long windowSize) {<!-- --> this. windowSize = windowSize; } @Override public void open(Configuration parameters) throws Exception {<!-- --> windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Types.LONG, Types.LONG)); } @Override public void processElement(Event event, Context ctx, Collector<String> collector) throws Exception {<!-- --> // Each time a piece of data comes, judge which window it belongs to according to the timestamp long windowStart = event.timestamp / windowSize * windowSize; long windowEnd = windowStart + windowSize; // Register the timer of end -1, the window triggers the calculation ctx.timerService().registerEventTimeTimer(windowEnd - 1); // Update the pv value in the state if (windowPvMapState. contains(windowStart)) {<!-- --> Long pv = windowPvMapState.get(windowStart); windowPvMapState.put(windowStart, pv + 1); } else {<!-- --> windowPvMapState.put(windowStart, 1L); } } // Timer triggers, directly output statistical pv results @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {<!-- --> long windowEnd = timestamp + 1; long windowStart = windowEnd - windowSize; long pv = windowPvMapState.get(windowStart); out.collect("url:[" + ctx.getCurrentKey() + "]'s traffic is " + pv + ", the corresponding window is: [" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd) + "]"); // Simulate the destruction of the window, clear the key in the map windowPvMapState. remove(windowStart); } } }
1.4, Reducing State (ReducingState)
1.4.1, Definition
Calculate the current data state and the previous data state, and use the built-in reduce for statistical reduction
1.4.2, use cases
Calculate the average timestamp for every 5 data of user click event stream
package com.hpsk.flink.state; import com.hpsk.flink.beans.Event; import com.hpsk.flink.source.EventWithWatermarkSource; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.sql.Timestamp; public class ReducingStateDS {<!-- --> public static void main(String[] args) throws Exception {<!-- --> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env. setParallelism(1); DataStreamSource<Event> inputDS = env. addSource(new EventWithWatermarkSource()); inputDS. print("input"); SingleOutputStreamOperator<String> result = inputDS .keyBy(t -> t.user) .flatMap(new RichFlatMapFunction<Event, String>() {<!-- --> // Define the ReducingState calculation and store the result private ReducingState<Tuple2<Event, Long>> reducingState; // Define a value state to save the current user access frequency private ValueState<Long> countState; @Override public void open(Configuration parameters) throws Exception {<!-- --> // initialization countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class)); reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Tuple2<Event, Long>>( "reducing-state", new ReduceFunction<Tuple2<Event, Long>>() {<!-- --> @Override public Tuple2<Event, Long> reduce(Tuple2<Event, Long> t1, Tuple2<Event, Long> t2) throws Exception {<!-- --> // Accumulate access time return Tuple2.of(t1.f0, t2.f1 + t1.f1); } }, Types.TUPLE(Types.LONG, Types.LONG) )); } @Override public void flatMap(Event event, Collector<String> collector) throws Exception {<!-- --> Long count = countState. value(); if (count == null) {<!-- --> count = 1L; } else {<!-- --> count + + ; } // update access count countState. update(count); // accumulatively calculate the access time reducingState.add(Tuple2.of(event, event.timestamp)); if (count == 5) {<!-- --> // Visit 5 times, output the average access time collector.collect(event.user + " " + new Timestamp(reducingState.get().f1 / count)); // clear state countState. clear(); reducingState. clear(); } } }); result. print("output "); env. execute(); } }
1.5, AggregatingState
1.5.1, Definition
Represent state as a list for aggregate operations
1.5.2, use cases
Calculate the average timestamp for every 5 data of user click event stream
package com.hpsk.flink.state; import com.hpsk.flink.beans.Event; import com.hpsk.flink.source.EventWithWatermarkSource; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.sql.Timestamp; public class AggregatingStateDS {<!-- --> public static void main(String[] args) throws Exception {<!-- --> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env. setParallelism(1); DataStreamSource<Event> inputDS = env. addSource(new EventWithWatermarkSource()); inputDS. print("input"); SingleOutputStreamOperator<String> result = inputDS .keyBy(t -> t.user) .flatMap(new RichFlatMapFunction<Event, String>() {<!-- --> // Define the aggregation state used to calculate the average timestamp private AggregatingState<Event, Long> aggregatingState; // Define a value state to save the current user access frequency private ValueState<Long> countState; @Override public void open(Configuration parameters) throws Exception {<!-- --> countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class)); aggregatingState = getRuntimeContext().getAggregatingState( new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>( "avg-ts", new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {<!-- --> @Override public Tuple2<Long, Long> createAccumulator() {<!-- --> return Tuple2.of(0L, 0L); } @Override public Tuple2<Long, Long> add(Event event, Tuple2<Long, Long> accumulator) {<!-- --> return Tuple2.of(accumulator.f0 + event.timestamp, accumulator.f1 + 1); } @Override public Long getResult(Tuple2<Long, Long> accumulator) {<!-- --> return accumulator.f0 / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> longLongTuple2, Tuple2<Long, Long> acc1) {<!-- --> return null; } }, Types. TUPLE(Types. LONG, Types. LONG))); } @Override public void flatMap(Event event, Collector<String> out) throws Exception {<!-- --> Long count = countState. value(); if (count == null) {<!-- --> count = 1L; } else {<!-- --> count + + ; } countState. update(count); aggregatingState.add(event); if (count == 5) {<!-- --> out.collect((event.user + " Average Timestamp: " + new Timestamp(aggregatingState.get()))); countState. clear(); } } }); result. print("output "); env. execute(); } }
2. Broadcast State (BroadcastState)
2.1, Definition
Sometimes we want the operator parallel subtasks to maintain the same “global” state for unified configuration and rule setting. At this time, all data in all partitions will access the same state, as if the state is “broadcast” to all partitions. This special operator state is called broadcast state (BroadcastState).
2.2, case
Simulate the creation of dimension tables in the dynamic configuration real-time data warehouse
package com.hpsk.flink.function; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; // Simulate the creation of dimension tables in the dynamic configuration real-time data warehouse public class BroadcastProcessFunctionDS {<!-- --> public static void main(String[] args) throws Exception {<!-- --> // 1. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env. setParallelism(1); // 2. Configuration flow: configuration table of dimension table SingleOutputStreamOperator<String> tableConfigStream = env.fromElements( "table1,createTable", "table2,createTable", "table3,createTable"); // 3. Mainstream: real-time data flow of business library SingleOutputStreamOperator<Tuple2<String, String>> MySqlTableStream = env.socketTextStream("hadoop102", 8888) .map(line -> Tuple2.of(line.split(",")[0], line.split(",")[1])) .returns(Types. TUPLE(Types. STRING, Types. STRING)); // Process the configuration stream into a broadcast stream MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, String.class); BroadcastStream<String> broadcast = tableConfigStream.broadcast(mapStateDescriptor); // Connect the mainstream and the broadcast stream into a connection stream, process the connection stream, and process the mainstream data according to the configuration information BroadcastConnectedStream<Tuple2<String, String>, String> connectedStream = MySqlTableStream.connect(broadcast); SingleOutputStreamOperator<String> result = connectedStream. process(new MyBroadcastProcessFunction(mapStateDescriptor)); // 5. Output the result result. print("output "); // 6. Execute env. execute(); } public static class MyBroadcastProcessFunction extends BroadcastProcessFunction<Tuple2<String, String>, String, String>{<!-- --> private MapStateDescriptor<String, String> mapStateDescriptor; public MyBroadcastProcessFunction(MapStateDescriptor<String, String> mapStateDescriptor) {<!-- --> this. mapStateDescriptor = mapStateDescriptor; } @Override public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {<!-- --> BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor); String[] split = value. split(","); broadcastState.put(split[0].trim(), split[1].trim()); } @Override public void processElement(Tuple2<String, String> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {<!-- --> ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor); String table = value.f0; String create = broadcastState. get(table); if (create != null) {<!-- --> out.collect(value.f0 + " is the configuration table, you need to create a table in phoenix -> table creation statement: " + create + ", the data is: " + value.f1); } else {<!-- --> out.collect(value.f0 + "business table, skip table creation"); } } } }