[Apache Flink] Implementing stateful functions

Article directory

  • Declare key-value partitioning status in RuntimeContext
  • Implement the operator list status through the ListCheckPonitend interface
  • Using the CheckpointedFunction interface
  • Receive checkpoint completion notification
  • Reference documentation

Declare key-value partition status in RuntimeContext

Flink provides several different primitives (data types) for keyed state. This is because different algorithms and operations may need to manage different types of state. Some of these primitives include:

  1. ValueState: This state type is used to store a single, potentially updateable value. Common uses include storing counters or aggregates.

  2. ListState: This state is used to store a set of elements (usually a long list of elements). With this state, it is possible to simply append elements and iterate over all elements.

  3. ReducingState and AggregatingState: These two states are used for merging elements, usually used in window operations.

    • ReducingState: Merge the added elements with the existing elements through the reduce function. In the end, only one element will be retained, which is the result of the merger.

    • AggregatingState: Similar to ReducingState, but it can store converted aggregate results instead of input elements.

  4. MapState: This state type stores a key-value mapping.

To use a certain type of keyed state, you need to provide a StateDescriptor that declares the name and type of the state. The status can then be obtained via RuntimeContext.

These state types are all interfaces and isolate the specific implementation details of the storage backend (Flink provides two backends for storing state, memory and RocksDB), so users do not need to care about how the state is stored and accessed.

Flink’s keyed state allows us to naturally handle keyed data flow through simple API calls. We only need to care about the current events and status of specific keys, and the Flink framework will automatically handle the distributed storage and faults of the state. Recovery etc.

We need to understand that in Flink, RuntimeContext provides access to contextual information that can be accessed during running tasks (such as Map, Reduce or Filter function), such as task parallelism, task name, task ID, input and output signals, etc. In addition, RuntimeContext provides user code with the means to generate and maintain distributed accumulator and key-value state.

In Apache Flink, keyed state is a type of state that is key-centered. Each key can correspond to a state. We can obtain and initialize it through RuntimeContext in the open() method of the Flink operator.

For example, suppose we are building a real-time online game analysis system. We might focus on each player's real-time score based on the actions they performed in the game (such as completing a task, defeating an enemy, etc. ). In this scenario, each player's ID is a "key" and their game score is the "status" associated with the key. As players perform actions in the game, we need to adjust their score status.

Our Flink code can then define a RichMapFunction to maintain each player’s score status:

public class PlayerScoreFunction extends RichMapFunction<GameEvent, Tuple2<String, Long>> {<!-- -->
  
  //Define keying state
  private transient ValueState<Long> scoreState;

  @Override
  public void open(Configuration params) throws Exception {<!-- -->
    ValueStateDescriptor<Long> descriptor =
        new ValueStateDescriptor<>(
          "playerScore", //The name of the state
          TypeInformation.of(new TypeHint<Long>() {<!-- -->}),
          0L); //Default value
    scoreState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public Tuple2<String, Long> map(GameEvent gameEvent) throws Exception {<!-- -->
    //update the state
    long currentScore = scoreState.value();
    currentScore + = gameEvent.getScore();
    scoreState.update(currentScore);
    // return the updated score
    return new Tuple2<>(gameEvent.getPlayerId(), currentScore);
  }
}

In this example, PlayerScoreFunction receives a stream of GameEvent, which are events generated by the player’s various actions in the game. We process this stream using the player’s ID as the key. Through getRuntimeContext().getState(descriptor) we get the state. Then every time a new GameEvent arrives, we use scoreState.update(currentScore) to update the state according to the score increment in the event, and then add the updated score and player The ID is output to the next operator together, for example, connected to a real-time game score dashboard to display the latest score of each player to the audience.

.

Implementing the operator list status through the ListCheckPonitend interface

Operator State is a special type of state in stream processing systems (such as Apache Flink), targeting the entire operator rather than specific key values. It stores the global information of all records of a specific operator.

The maintenance of operator status mainly includes the following steps:

  1. Define operator states: First, we need to define one or more operator states in the processing function. We can specify the name of the operator state and define the data type it stores.

  2. Reading and writing operator status: Once the operator status is defined, we can read and write it in the stream processing function. Reading operator status is typically performed when processing decisions need to be made based on status information. Writing operator state is usually done when we need to update state information.

  3. Maintain state consistency: In order to maintain state consistency, we need to regularly snapshot the operator state and save it to a remote storage system. After a system outage, we can restore the operator state from the latest snapshot.

  4. State recovery: After a system interruption, we can use the saved snapshot to restore the operator state and resume the execution of stream processing.

The method of maintaining operator state may vary depending on the specific stream processing system, but the basic principles are the same. These four steps are the basic process of maintaining operator status.

In Flink, ListState is a type of CheckpointedState. ListState can save more than one value for each piece of data, that is, all data will be added to the state. On failure recovery, these elements are replayed in the order they were added. We inherit from the abstract type of the CheckpointedFunction or ListCheckpointed interface, and then implement the snapshotState and restoreState methods to complete state restoration .

Specifically, if we want to use the ListCheckpointed interface to implement the operator list state, we can refer to the following code:

Every time we receive an unserialized String type value, we convert it to an Integer type and store it in a list (List). In each Checkpoint operation, the state is snapshotted and returned through the snapshotState method. When a failure occurs, Flink will call the restoreState method to restore the state.

If the operator is parallel, Flink will call the restoreState method for each subtask and create a new list state instance in each subtask of the operator. When performing state recovery after a failure, Flink will extract the snapshot and distribute it to each subtask.

public class ListStateFunction extends RichMapFunction<String, Integer> implements ListCheckpointed<Integer> {<!-- -->

  private List<Integer> bufferElements;

  public ListStateFunction(){<!-- -->
    this.bufferElements = new ArrayList<>();
  }

  @Override
  public Integer map(String value) throws Exception {<!-- -->
    int parsedValue = Integer.parseInt(value);
    bufferElements.add(parsedValue);
    return bufferElements.size();
  }

  // Each time you checkpoint, take a snapshot of the cached elements
  @Override
  public List<Integer> snapshotState(long checkpointId, long timestamp) {<!-- -->
    return this.bufferElements;
  }

  //Restore state from storage
  @Override
  public void restoreState(List<Integer> state) {<!-- -->
    this.bufferElements.addAll(state);
  }
}

Whether to use ListCheckpointed or CheckpointedFunction depends on the specific needs and context. Both are functionally similar, but CheckpointedFunction provides more flexibility. functionality, allowing you to decide how to store and restore state and in what type of state backend.

Use CheckpointedFunction interface

Apache Flink provides a special interface CheckpointedFunction, which can be used in custom functions to operate and manage operator status. This interface is triggered during a checkpoint operation and allows access and editing of operator status.

hExample of using CheckpointedFunction:

public class CountWithCheckpoint implements CheckpointedFunction, MapFunction<Long, Long> {<!-- -->

  private transient ValueState<Long> counter;

  @Override
  public void initializeState(FunctionInitializationContext context) throws Exception {<!-- -->
    ValueStateDescriptor<Long> descriptor =
      new ValueStateDescriptor<>("counter", TypeInformation.of(new TypeHint<Long>() {<!-- -->}));
    counter = getRuntimeContext().getState(descriptor);
  }

  @Override
  public Long map(Long value) throws Exception {<!-- -->
    Long currentCount = counter.value();
    Long newCount = currentCount == null ? 1L : currentCount + 1;
    counter.update(newCount);
    return newCount;
  }

  @Override
  public void snapshotState(FunctionSnapshotContext context) throws Exception {<!-- -->
    counter.clear();
  }
}

This example creates a function that counts but clears at each checkpoint. The initializeState() method is called and initializes state variables during various life cycle events (such as start and resume). Then in the map() method, the status is updated. snapshotState() is triggered during the checkpoint operation. Here we only clear the state without any persistence operation.

When operating and maintaining operator state, we need to consider state consistency and recovery to handle possible failures and interruptions. In practice, the snapshotState() method may have more complex logic, such as storing the state remotely.

Receive checkpoint completion notification

In Apache Flink, when all tasks successfully create checkpoints from the joint location, the job manager will coordinate the control bar to notify all tasks of the successful completion of the checkpoint. All tasks are then notified of the completion of a new checkpoint.

If you want to receive and react to such notifications, you can have your RichFunction implement the CheckpointListener interface. Here’s a basic example:

The function uses ListState for state management, and each received element is added to the state. Moreover, we implemented the notifyCheckpointComplete(long checkpointId) function to receive notification after each successful checkpoint completion. In this function you can perform some operations such as clearing status, updating external systems, etc.

The notifyCheckpointComplete method is triggered before the next checkpoint occurs before the snapshot operation of the Task week. The specific implementation should be planned according to your checkpoint configuration and fault recovery capabilities.

public class MyFunction extends RichMapFunction<Long, Long> implements CheckpointListener {<!-- -->

    private transient ListState<Long> checkpointedState;

    @Override
    public void open(Configuration parameters) throws Exception {<!-- -->
        ListStateDescriptor<Long> descriptor =
            new ListStateDescriptor<>("state", Long.class);
        checkpointedState = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public Long map(Long value) throws Exception {<!-- -->
        checkedState.add(value);
        return value;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {<!-- -->
        // Listen to the notification that the checkpoint is successfully completed, and relevant logic processing can be performed here
    }
}

Reference documentation

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/