Flink supports restoring jobs from savepoint in SQL Client starting from version 1.13. Introduction to flink-savepoint
Next, we build an example of mysql cdc data entering the hudi data lake through kafka from Flink SQL Client. The overall process is as follows:
In the second step above, we manually stopped the Flink task of kafka→hudi, and then restored it from the savepoint in Flink SQL Client.
The following work is similar to Flink SQL Client’s actual CDC data entering into the lake. However, the flink version of this article is 1.13.1. You can refer to it to complete the verification of this article.
Environment dependencies
hadoop 3.2.0 zookeeper 3.6.3 kafka 2.8.0 mysql 5.7.35 flink 1.13.1-scala_2.12 flinkcdc 1.4 hudi 0.10.0-SNAPSHOT datafaker 0.7.6
Operation Guide
Use datafaker to import test data into mysql
Create a new stu8 table in the database
mysql -u root -p create database test; use test; create table stu8 ( id int unsigned auto_increment primary key COMMENT 'auto-increment id', name varchar(20) not null comment 'student's name', school varchar(20) not null comment 'school name', nickname varchar(20) not null comment 'student's nickname', age int not null comment 'Student age', score decimal(4,2) not null comment 'score', class_num int not null comment 'Class number', phone bigint not null comment 'phone number', email varchar(64) comment 'Home network mailbox', ip varchar(32) comment 'IP address' ) engine=InnoDB default charset=utf8;<br>
Create a new meta.txt file with the following contents:
id||int||increment id[:inc(id,1)] name||varchar(20)||student name school||varchar(20)||school name[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)] nickname||varchar(20)||student nickname[:enum(tom,tony,mick,rich,jasper)] age||int||Student age[:age] score||decimal(4,2)||score[:decimal(4,2,1)] class_num||int||Number of classes[:int(10, 100)] phone||bigint||phone number[:phone_number] email||varchar(64)||Home network email[:email] ip||varchar(32)||IP address[:ipv4]
Generate 1,000,000 pieces of data and write them to the test.stu8 table in mysql (set the data as large as possible so that the task of writing to hudi can continue)
datafaker rdb mysql + mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu8 1000000 --meta meta.txt
hudi, flink-mysql-cdc, flink-kafka related jar package download
This article provides the compiled hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar. If you want to compile hudi yourself, just clone the master branch and compile it. (Note to specify hadoop version)
Download the jar package to the lib directory of flink
cd flink-1.13.1/lib wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/flink/flink-sql-client-savepoint-example/hudi-flink-bundle_2.12-0.10 .0-SNAPSHOT.jar wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jar
Start the flink session cluster on yarn
First, make sure that HADOOP_CLASSPATH has been configured. For the open source version hadoop3.2.0, it can be set as follows:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce /*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*
Flink needs to enable checkpoint, configure the savepoint directory, and modify the flink-conf.yaml configuration file.
execution.checkpointing.interval: 150000ms state.backend: rocksdb state.checkpoints.dir: hdfs://hadoop:9000/flink-chk state.backend.rocksdb.localdir: /tmp/rocksdb state.savepoints.dir: hdfs://hadoop:9000/flink-1.13-savepoints
Start the flink session cluster
cd flink-1.13.1 bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -d
Start flink sql client
cd flink-1.13.1 bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar shell
flink reads mysql binlog and writes to kafka
Create mysql source table
create table stu8_binlog( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'hadoop', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'database-name' = 'test', 'table-name' = 'stu8' );
Create kafka target table
create table stu8_binlog_sink_kafka( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) with ( 'connector' = 'kafka' ,'topic' = 'cdc_mysql_test_stu8_sink' ,'properties.zookeeper.connect' = 'hadoop1:2181' ,'properties.bootstrap.servers' = 'hadoop1:9092' ,'format' = 'debezium-json' );
Create a task to write the mysql binlog log to kafka
insert into stu8_binlog_sink_kafka select * from stu8_binlog;
flink reads kafka data and writes it to hudi data lake
Create kafka source table
create table stu8_binlog_source_kafka( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'kafka', 'topic' = 'cdc_mysql_test_stu8_sink', 'properties.bootstrap.servers' = 'hadoop1:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'testGroup' );
Create hudi target table
create table stu8_binlog_sink_hudi( id bigint not null, name string, `school` string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) partitioned by (`school`) with ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:9000/tmp/test_stu8_binlog_sink_hudi', 'table.type' = 'MERGE_ON_READ', 'write.precombine.field' = 'school' );
Create a task to write kafka data to hudi
insert into stu8_binlog_sink_hudi select * from stu8_binlog_source_kafka;
After the task runs for a while, we manually save the hudi job and stop the task
bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001
Restoring tasks from savepoint: (executed in Flink SQL Client)
SET execution.savepoint.path=hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb
insert into stu8_binlog_sink_hudi select * from stu8_binlog_source_kafka;
The task can be seen resuming from the above checkpoint:
Original link: https://blog.csdn.net/weixin_39636364/article/details/120652618
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. MySQL entry skill treeSQL advanced skillsCTE and recursive query 69880 people are learning the system