Practical case: Sql client uses sql to operate FlinkCDC2Hudi and supports restoring hudi jobs from savepoint

Flink supports restoring jobs from savepoint in SQL Client starting from version 1.13. Introduction to flink-savepoint

Next, we build an example of mysql cdc data entering the hudi data lake through kafka from Flink SQL Client. The overall process is as follows:

In the second step above, we manually stopped the Flink task of kafka→hudi, and then restored it from the savepoint in Flink SQL Client.

The following work is similar to Flink SQL Client’s actual CDC data entering into the lake. However, the flink version of this article is 1.13.1. You can refer to it to complete the verification of this article.

Environment dependencies

hadoop 3.2.0

zookeeper 3.6.3

kafka 2.8.0

mysql 5.7.35

flink 1.13.1-scala_2.12

flinkcdc 1.4

hudi 0.10.0-SNAPSHOT

datafaker 0.7.6

Operation Guide
Use datafaker to import test data into mysql
Create a new stu8 table in the database

mysql -u root -p

create database test;
use test;
create table stu8 (
  id int unsigned auto_increment primary key COMMENT 'auto-increment id',
  name varchar(20) not null comment 'student's name',
  school varchar(20) not null comment 'school name',
  nickname varchar(20) not null comment 'student's nickname',
  age int not null comment 'Student age',
  score decimal(4,2) not null comment 'score',
  class_num int not null comment 'Class number',
  phone bigint not null comment 'phone number',
  email varchar(64) comment 'Home network mailbox',
  ip varchar(32) comment 'IP address'
  ) engine=InnoDB default charset=utf8;<br>

Create a new meta.txt file with the following contents:

id||int||increment id[:inc(id,1)]
name||varchar(20)||student name
school||varchar(20)||school name[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||student nickname[:enum(tom,tony,mick,rich,jasper)]
age||int||Student age[:age]
score||decimal(4,2)||score[:decimal(4,2,1)]
class_num||int||Number of classes[:int(10, 100)]
phone||bigint||phone number[:phone_number]
email||varchar(64)||Home network email[:email]
ip||varchar(32)||IP address[:ipv4]

Generate 1,000,000 pieces of data and write them to the test.stu8 table in mysql (set the data as large as possible so that the task of writing to hudi can continue)

datafaker rdb mysql + mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu8 1000000 --meta meta.txt

hudi, flink-mysql-cdc, flink-kafka related jar package download
This article provides the compiled hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar. If you want to compile hudi yourself, just clone the master branch and compile it. (Note to specify hadoop version)

Download the jar package to the lib directory of flink

cd flink-1.13.1/lib
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/flink/flink-sql-client-savepoint-example/hudi-flink-bundle_2.12-0.10 .0-SNAPSHOT.jar
wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jar 

Start the flink session cluster on yarn
First, make sure that HADOOP_CLASSPATH has been configured. For the open source version hadoop3.2.0, it can be set as follows:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce /*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*

Flink needs to enable checkpoint, configure the savepoint directory, and modify the flink-conf.yaml configuration file.

execution.checkpointing.interval: 150000ms
state.backend: rocksdb
state.checkpoints.dir: hdfs://hadoop:9000/flink-chk
state.backend.rocksdb.localdir: /tmp/rocksdb
state.savepoints.dir: hdfs://hadoop:9000/flink-1.13-savepoints

Start the flink session cluster

cd flink-1.13.1
bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -d

Start flink sql client

cd flink-1.13.1
bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar shell

flink reads mysql binlog and writes to kafka
Create mysql source table

create table stu8_binlog(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'database-name' = 'test',
'table-name' = 'stu8'
);

Create kafka target table

create table stu8_binlog_sink_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
) with (
'connector' = 'kafka'
,'topic' = 'cdc_mysql_test_stu8_sink'
,'properties.zookeeper.connect' = 'hadoop1:2181'
,'properties.bootstrap.servers' = 'hadoop1:9092'
,'format' = 'debezium-json'
);

Create a task to write the mysql binlog log to kafka

insert into stu8_binlog_sink_kafka
select * from stu8_binlog;

flink reads kafka data and writes it to hudi data lake
Create kafka source table

create table stu8_binlog_source_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'kafka',
'topic' = 'cdc_mysql_test_stu8_sink',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup'
);

Create hudi target table

create table stu8_binlog_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/test_stu8_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.precombine.field' = 'school'
);

Create a task to write kafka data to hudi

insert into stu8_binlog_sink_hudi
select * from stu8_binlog_source_kafka;

After the task runs for a while, we manually save the hudi job and stop the task

bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001

Restoring tasks from savepoint: (executed in Flink SQL Client)

SET execution.savepoint.path=hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb
insert into stu8_binlog_sink_hudi
select * from stu8_binlog_source_kafka;

The task can be seen resuming from the above checkpoint:

Original link: https://blog.csdn.net/weixin_39636364/article/details/120652618

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. MySQL entry skill treeSQL advanced skillsCTE and recursive query 69880 people are learning the system