Recap
Synchronize table AA in mysqlA to table BA in mysqlB, where reading uses mysql-cdc and writing uses JDBC
Resource preparation
-
Download the corresponding jar package
- flink-sql-connector-mysql-cdc-2.3.0.jar
- flink-connector-jdbc_2.12-1.14.6.jar
- mysql-connector-java-8.0.30.jar
Copy the downloaded package toflink-1.16.1/lib
-
Enable GTID for the database
Find my.cnf, add the following information in
[mysqld]
[mysqld] ... server-id=1 log-bin=mysql-bin binlog-format=row gtid-mode=ON enforce-gtid-consistency=true log-slave-updates=ON
-
prepare data
Information in mysqlA
CREATE DATABASE test_db; USE test_db; CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- Whether order has been placed ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
Information in mysqlB
CREATE DATABASE test_dw; USE test_dw; CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- Whether order has been placed )
Install Flink documentation offline
-
install java 11
# mac install java11 $ brew install java11 $ sudo ln -sfn /usr/local/opt/openjdk@11/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-11.jdk $ java -version openjdk version "11.0.16.1" 2022-08-12 OpenJDK Runtime Environment Homebrew (build 11.0.16.1+0) OpenJDK 64-Bit Server VM Homebrew (build 11.0.16.1 + 0, mixed mode)
-
Download release 1.16.1 and extract it
$ tar -xzf flink-1.16.1-bin-scala_2.12.tgz $ cd flink-1.16.1
-
Start the cluster
$ cd flink-1.16.1 # start the cluster $ ./bin/start-cluster.sh # stop $ ./bin/stop-cluster.sh
After starting the cluster, visit: http://localhost:8081/ to see the web UI
Use Fink SQL to achieve synchronization with zero code
-
Start Fink SQL
$ cd flink-1.16.1 $ ./bin/sql-client.sh ?▓██▓██? ▓█████▓?▓███▓? ▓███▓▓██ ?██? ▓▓█▓▓ ?████ ██? ▓███? ?█?█? ?▓█ ███ ▓██ ▓█ ?▓██▓?▓▓█ █? █ ? ███▓▓█ ?█? ████? ?▓█▓ ██? ▓███? █▓▓██ ▓█? ▓█?▓██▓ ?█? ▓▓████? ██ ?█ █▓█?█? ███▓?██▓▓█ █ █▓?▓█▓▓█? ?██▓ ?█? █ █? ?█████▓? ██▓ ███? ? █? ▓ ?█ █████? ?█? ▓ ▓? ██▓█ ▓? ▓███████▓? ?█? ?▓ ▓██▓ ?██▓ ▓█ █▓█ █████▓▓ ██ █ ? ▓█? ▓█▓ ▓█ ██▓ ?▓▓▓▓▓▓▓? ?██▓ ?█? ▓█ █ ▓███▓ ?▓▓▓███▓? ▓█ ██▓ ██? ▓▓███▓▓▓▓▓██████▓? ▓███ █ ▓███? ███ ?▓▓ ▓████▓?▓? █▓ █▓▓▓██ ?▓██▓? █▓ ██ ▓█ ▓▓▓▓█▓?▓▓██▓▓ ▓█▓▓?█ █▓?▓▓██▓█?▓█████? ██? ▓█?█? ?▓▓? ▓█ █? ?█? ▓█ ?█▓ ? █? ?█ █▓ █▓ ██ █? ▓▓ ?█▓▓▓?█? █▓ ?▓██? ▓? ▓█▓?▓█? ?█ ██ ▓█▓? ? █?██? ▓▓ ▓█? ?█▓ █?█▓?██ ?██? ?▓▓? ▓██▓?█? ?▓▓▓▓?█▓ ?▓██? ▓? ?█▓█ ? ?▓▓▓▓▓?▓▓▓█? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_| \__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /Users/zhangkai/.flink-sql-history Flink SQL>
-
set checkpoint
- Reference Document 1
- Reference Document 2
# Make a checkpoint every 3 seconds, production suggestion is 5-10 minutes Flink SQL> SET execution.checkpointing.interval = 3s;
-
Use Flink SQL CLI to create corresponding tables for synchronizing the data of these underlying database tables
# Read the information in mysqlA CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'secret', 'database-name' = 'test_db', 'table-name' = 'orders' );
- Create the dw_orders table to write the associated order data into mysqlB
# Transition table for writing to mysqlB CREATE TABLE dw_orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test_dw', 'username' = 'root', 'password' = 'secret', 'table-name' = 'orders' );
- Execute writing to mysqlB
INSERT INTO dw_orders SELECT * FROM orders;
-
So far, the additions, deletions and modifications of test_db.orders will be synchronized to test_dw.orders
-
in general
mysql-cdc reads the information in mysqlA, and jdbc overwrites the information in mysqlB.