Article directory
- 1. Write MySQL data to Hudi through Flink SQL
- 2. Simulate Flink task exception
-
- 2.1 Manually stop the job
- 2.2 Specify checkpoint to restore data
- 2.3 Task recovery on the entire yarn-session
- 3. Simulate source-end exceptions
-
- 3.1 Manually shut down the source MySQL service
- 3.2 FLink task view
- FAQ:
-
- 1. Checkpoint has not written data
- 2. checkpoint failed
- 3. After manually canceling the Flink job, the checkpoint file is automatically deleted
- refer to:
1. Write MySQL data to Hudi through Flink SQL
Start Yarn Session
$FLINK_HOME/bin/yarn-session.sh -jm 16384 -tm 16384 -d 2> &1 & /home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
Flink SQL code:
-- Set the time interval of checkpoint set execution.checkpointing.interval=60sec; -- Set the checkpoint file not to be cleared after the end of the task, to facilitate subsequent recovery set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION; -- There can only be one checkpoint process at a time set execution.checkpointing.max-concurrent-checkpoints=1; CREATE TABLE flink_mysql_cdc1 ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name varchar(100) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'hp8', 'port' = '3306', 'username' = 'root', 'password' = 'abc123', 'database-name' = 'test', 'table-name' = 'mysql_cdc', 'server-id' = '5409-5415', 'scan.incremental.snapshot.enabled'='true' ); CREATE TABLE flink_hudi_mysql_cdc1( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name varchar(100) ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc1', 'table.type' = 'MERGE_ON_READ', 'changelog.enabled' = 'true', 'hoodie.datasource.write.recordkey.field' = 'id', 'write.precombine.field' = 'name', 'compaction.async.enabled' = 'true', 'hive_sync.enable' = 'true', 'hive_sync.table' = 'flink_hudi_mysql_cdc1', 'hive_sync.db' = 'test', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'thrift://hp5:9083', 'hive_sync.conf.dir'='/home/apache-hive-3.1.2-bin/conf' ); set table.exec.resource.default-parallelism=4; insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
Flink web interface:
2000w data initialization has been completed
The amount of checkpoint logs is really large
hdfs view checkpoint log volume
2. Simulate Flink task exception
2.1 Stop job manually
Manually end the Flink SQL task on the Flink web interface
2.2 Specify checkpoint to restore data
Find the nearest checkpoint:
Code:
set 'execution.savepoint.path'='hdfs://hp5:8020/vmcluster/flink-checkpoints/a2874606453b4aebfdaca2f627355f99/chk-23'; insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
2.3 Task recovery on the entire yarn-session
To be tested:
If the entire yarn-session is abnormal, you can also specify a checkpoint when starting the yarn session.
$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d -s hdfs://hp5:8020/vmcluster/flink-checkpoints/c12eb538c2e8965d2d94c170b67641f2/chk-1/_metadata /home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
3. Simulate source exception
3.1 Manually shut down the source MySQL service
service mysqld stop
3.2 FLink task view
Flink can retry by itself, which is quite good without manual intervention.
After mysql is successfully started, the task can continue to be connected.
FAQ:
1. checkpoint has not written data
other people’s checkpoint
My checkpoint
It seems that my checkpoint is not successful
Modify the checkpoint interval:
-- before modification: set execution.checkpointing.interval=10sec; -- Modified: set execution.checkpointing.interval=60sec;
2. checkpoint failed
Error message:
Checkpoint Coordinator is suspending.
Solution:
Increase the resources of yarn-session from 8G to 16G to solve the problem.
For some large tables, it is best to initialize them through Spark first, and then increase them.
3. After manually canceling the Flink job, the checkpoint file is automatically deleted
https://developer.aliyun.com/ask/435979
http://events.jianshu.io/p/032396543ceb
The resources seen on the Internet are all for the code level, but not for the Flink SQL level.
Still have to go to the official website to find
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
Reference:
- https://blog.csdn.net/qq_31866793/article/details/103069646