Hudi Series 25: Flink SQL uses checkpoint to restore job exceptions

Article directory

  • 1. Write MySQL data to Hudi through Flink SQL
  • 2. Simulate Flink task exception
    • 2.1 Manually stop the job
    • 2.2 Specify checkpoint to restore data
    • 2.3 Task recovery on the entire yarn-session
  • 3. Simulate source-end exceptions
    • 3.1 Manually shut down the source MySQL service
    • 3.2 FLink task view
  • FAQ:
    • 1. Checkpoint has not written data
    • 2. checkpoint failed
    • 3. After manually canceling the Flink job, the checkpoint file is automatically deleted
  • refer to:

1. Write MySQL data to Hudi through Flink SQL

Start Yarn Session

$FLINK_HOME/bin/yarn-session.sh -jm 16384 -tm 16384 -d 2> &1 &

/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session

Flink SQL code:

-- Set the time interval of checkpoint
set execution.checkpointing.interval=60sec;
-- Set the checkpoint file not to be cleared after the end of the task, to facilitate subsequent recovery
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
-- There can only be one checkpoint process at a time
set execution.checkpointing.max-concurrent-checkpoints=1;

CREATE TABLE flink_mysql_cdc1 (
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'hp8',
    'port' = '3306',
    'username' = 'root',
    'password' = 'abc123',
    'database-name' = 'test',
    'table-name' = 'mysql_cdc',
    'server-id' = '5409-5415',
    'scan.incremental.snapshot.enabled'='true'
);

CREATE TABLE flink_hudi_mysql_cdc1(
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
   'connector' = 'hudi',
   'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc1',
   'table.type' = 'MERGE_ON_READ',
   'changelog.enabled' = 'true',
   'hoodie.datasource.write.recordkey.field' = 'id',
   'write.precombine.field' = 'name',
   'compaction.async.enabled' = 'true',
   'hive_sync.enable' = 'true',
   'hive_sync.table' = 'flink_hudi_mysql_cdc1',
   'hive_sync.db' = 'test',
   'hive_sync.mode' = 'hms',
   'hive_sync.metastore.uris' = 'thrift://hp5:9083',
   'hive_sync.conf.dir'='/home/apache-hive-3.1.2-bin/conf'
);


set table.exec.resource.default-parallelism=4;

insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;

Flink web interface:
2000w data initialization has been completed
image.png

The amount of checkpoint logs is really large
image.png

hdfs view checkpoint log volume
image.png

2. Simulate Flink task exception

2.1 Stop job manually

Manually end the Flink SQL task on the Flink web interface
image.png

2.2 Specify checkpoint to restore data

Find the nearest checkpoint:
image.png

Code:

set 'execution.savepoint.path'='hdfs://hp5:8020/vmcluster/flink-checkpoints/a2874606453b4aebfdaca2f627355f99/chk-23';

insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;

image.png

image.png

2.3 Task recovery on the entire yarn-session

To be tested:
If the entire yarn-session is abnormal, you can also specify a checkpoint when starting the yarn session.

$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d -s hdfs://hp5:8020/vmcluster/flink-checkpoints/c12eb538c2e8965d2d94c170b67641f2/chk-1/_metadata

/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session

3. Simulate source exception

3.1 Manually shut down the source MySQL service

service mysqld stop

3.2 FLink task view

Flink can retry by itself, which is quite good without manual intervention.

After mysql is successfully started, the task can continue to be connected.

image.png

FAQ:

1. checkpoint has not written data

other people’s checkpoint
image.png

My checkpoint
It seems that my checkpoint is not successful
image.png

Modify the checkpoint interval:

-- before modification:
set execution.checkpointing.interval=10sec;

-- Modified:
set execution.checkpointing.interval=60sec;

2. checkpoint failed

Error message:

Checkpoint Coordinator is suspending.

image.png

Solution:
Increase the resources of yarn-session from 8G to 16G to solve the problem.

For some large tables, it is best to initialize them through Spark first, and then increase them.

3. After manually canceling the Flink job, the checkpoint file is automatically deleted

https://developer.aliyun.com/ask/435979
http://events.jianshu.io/p/032396543ceb

The resources seen on the Internet are all for the code level, but not for the Flink SQL level.
Still have to go to the official website to find
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/

set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;

Reference:

  1. https://blog.csdn.net/qq_31866793/article/details/103069646