A brief discussion on using FlinkKafkaProducer to implement Exactly Once semantics on the sink side

Abstract

In some important flink data processing scenarios, it is necessary to implement Exactly Once data processing. Exactly Once means that when flink processes data, it can ensure that data is not lost and data is not repeated.

The entire flink processing link is roughly divided into three links: Source -> Transform -> Sink. Selecting the kafka component that supports message persistence and resetting the consumption site can ensure that the data on the Source side is processed accurately at one time. Relying on flink’s own checkpoint mechanism ensures that the data in the Transform stage is processed exactly once. This article discusses the use of kafka that supports transactions to achieve accurate one-time processing of data on the sink side.

Text

Based on flink1.13, use the officially provided FlinkKafkaProducer that writes to kafka. In order to implement Exactly Once semantics, the flink program requires the following related configurations

//Open checkpoint for flink job
env.enableCheckpointing(interval);

//FlinkKafkaProducer
Properties producerpros = new Properties();
producerpros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xx:9092");
/*
* The transaction timeout time configured in Flink's Kafka connector transaction.timeout.ms defaults to 1 hour.
    The maximum transaction timeout configured by the Kafka cluster, transaction.max.timeout.ms, defaults to 15 minutes.
* Of these two timeouts, the former should be less than or equal to the latter.
* */
producerpros.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600000);

//Create flinkkafkaProducer
FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer(
        "topic",
        new KafkaProducerSerializationSchema("topic"),
        producerpros,
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE //EXACTLY_ONCE semantics
);

(1) Relevant configuration and checkpoint process of flink program

FlinkKafkaProducer indirectly implements the CheckpointedFunction and CheckpointListener interfaces by inheriting TwoPhaseCommitSinkFunction. This means that the Sink operator (FlinkKafkaProducer) is part of the overall checkpoint process of the job.

public class FlinkKafkaProducer<IN>
        extends TwoPhaseCommitSinkFunction<
                IN,
                FlinkKafkaProducer.KafkaTransactionState,
                FlinkKafkaProducer.KafkaTransactionContext> {...}
        
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
        implements CheckpointedFunction, CheckpointListener {...}

We choose a simple flink processing process on the official website to view the checkpoint of the overall job.

Flink official website checkpoint case

(1.1)

JobManager coordinates each operator to do checkpoints and starts injecting barriers into the source operator. The source operator receives the barrier, takes a snapshot of its own state and saves it to the state backend, and passes the barrier downstream.

(1.2)

When the sink operator receives the barrier passed by the window operator, it begins to take a snapshot of its own state and submit it to the state backend.

At this time, the snapshotState method in TwoPhaseCommitSinkFunction will be called.

 @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        ...
    //checkpointId of this checkpoint
        long checkpointId = context.getCheckpointId();
        ...
    //Pre-submit the transaction where the data to be sinked this time is located
        preCommit(currentTransactionHolder.handle);
    //Put this transaction into the pending transaction
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        ...
    //Start a new transaction as currentTransaction to store the data of the next checkpoint
        currentTransactionHolder = beginTransactionInternal();
        ...
        state.clear();
    //Save currentTransaction/pendingCommitTransactions into the state
        state.add(
                new State<>(
                        this.currentTransactionHolder,
                        new ArrayList<>(pendingCommitTransactions.values()),
                        userContext));
    }

(1.3)

After the checkpoint of all operators is completed, the JobManager notifies all operators that the snapshot is successful.

When the sink operator receives the notification, it will call the notifyCheckpointComplete method in TwoPhaseCommitSinkFunction to formally submit the transaction (on the contrary, if the checkpoint is unsuccessful, the transaction will not be submitted).

Flinkkafkaproducer will override abstract methods such as beginTransaction, preCommit, commit, and abort in TwoPhaseCommitSinkFunction to implement transaction processing.

Note: Each transaction has a corresponding transaction ID. As can be seen from the above code screenshot, getTransactionalId is called in beginTransaction, and recycleTransactionalProducer is called in commit and abort.

As can be seen from the following code in flinkkafkaproducer, there is a circular queue to store available transaction IDs. Every time a transaction is started (a new producer is created), the ID is taken from the head of the queue. After committing/abandoning the transaction, add this Id to the end of the queue. There are 5 available transaction IDs in the default queue.

 private final BlockingDeque<String> availableTransactionalIds
                      = new LinkedBlockingDeque<>();
 /**
  * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new transactions
  * will not clash with transactions created during previous checkpoints ({@code
  * producer.initTransactions()} assures that we obtain new producerId and epoch counters).
  */
 private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer()
         throws FlinkKafkaException {
     String transactionalId = availableTransactionalIds.poll();
     ...
     FlinkKafkaInternalProducer<byte[], byte[]> producer
     = initTransactionalProducer(transactionalId, true);
     producer.initTransactions();
     return producer;
 }

 private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
     availableTransactionalIds.add(producer.getTransactionalId());
     producer.flush();
     producer.close(Duration.ofSeconds(0));
 }

Taking a flink job with Exactly Once enabled as an example (checkpoint is done every 2 minutes), the log screenshot below shows the cyclic use of transaction IDs.

Based on (1.2) & (1.3), from the perspective of the sink operator, the calling process of each method is roughly as follows.

(2) Related configurations of consumer groups

In addition to the relevant configuration required by the flink program, consumers downstream of the topic also need to change the value of the isolation level parameter isolation.level from the default read_uncommitted (can read uncommitted data) to read_committed (can only read submitted data). Ensure accurate one-time processing of data.

The data in the pre-commit phase has been written to the kafka broker but is marked as “uncommitted”. These messages are marked as “committed” only after the transaction is officially committed.

In summary, we have discussed the configuration of flink and consumer group to achieve precise one-time processing on the flink sink side (kafka).

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 17061 people are learning the system