Install Flink offline and use Flink SQL to synchronize data.

Recap

Synchronize table AA in mysqlA to table BA in mysqlB, where reading uses mysql-cdc and writing uses JDBC

Resource preparation

  • Download the corresponding jar package

    • flink-sql-connector-mysql-cdc-2.3.0.jar
    • flink-connector-jdbc_2.12-1.14.6.jar
    • mysql-connector-java-8.0.30.jar
      Copy the downloaded package to flink-1.16.1/lib
  • Enable GTID for the database

    Find my.cnf, add the following information in [mysqld]

    [mysqld]
    ...
    server-id=1
    log-bin=mysql-bin
    binlog-format=row
    gtid-mode=ON
    enforce-gtid-consistency=true
    log-slave-updates=ON
    
  • prepare data

    Information in mysqlA

    CREATE DATABASE test_db;
    USE test_db;
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;
    
    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
           (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
           (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
    

    Information in mysqlB

    CREATE DATABASE test_dw;
    USE test_dw;
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    )
    

Install Flink documentation offline

  • install java 11

    # mac install java11
    $ brew install java11
    $ sudo ln -sfn /usr/local/opt/openjdk@11/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-11.jdk
    $ java -version
    openjdk version "11.0.16.1" 2022-08-12
    OpenJDK Runtime Environment Homebrew (build 11.0.16.1+0)
    OpenJDK 64-Bit Server VM Homebrew (build 11.0.16.1 + 0, mixed mode)
    
  • Download release 1.16.1 and extract it

    $ tar -xzf flink-1.16.1-bin-scala_2.12.tgz
    $ cd flink-1.16.1
    
  • Start the cluster

    $ cd flink-1.16.1
    # start the cluster
    $ ./bin/start-cluster.sh
    # stop
    $ ./bin/stop-cluster.sh
    

    After starting the cluster, visit: http://localhost:8081/ to see the web UI

Use Fink SQL to achieve synchronization with zero code

  • Start Fink SQL

    $ cd flink-1.16.1
    $ ./bin/sql-client.sh
    
                                       ?▓██▓██?
                                   ▓█████▓?▓███▓?
                                ▓███▓▓██
                              ?██? ▓▓█▓▓ ?████
                              ██? ▓███? ?█?█?
                                ?▓█ ███ ▓██
                                  ▓█ ?▓██▓?▓▓█
                                █? █ ? ███▓▓█ ?█?
                                ████? ?▓█▓ ██? ▓███?
                             █▓▓██ ▓█? ▓█?▓██▓ ?█?
                       ▓▓████? ██ ?█ █▓█?█?
                      ███▓?██▓▓█ █ █▓?▓█▓▓█?
                    ?██▓ ?█? █ █? ?█████▓? ██▓
                   ███? ? █? ▓ ?█ █████? ?█? ▓ ▓?
                  ██▓█ ▓? ▓███████▓? ?█? ?▓ ▓██▓
               ?██▓ ▓█ █▓█ █████▓▓ ██ █ ? ▓█?
               ▓█▓ ▓█ ██▓ ?▓▓▓▓▓▓▓? ?██▓ ?█?
               ▓█ █ ▓███▓ ?▓▓▓███▓? ▓█
               ██▓ ██? ▓▓███▓▓▓▓▓██████▓? ▓███ █
              ▓███? ███ ?▓▓ ▓████▓?▓? █▓
              █▓▓▓██ ?▓██▓? █▓
              ██ ▓█ ▓▓▓▓█▓?▓▓██▓▓
              ▓█▓▓?█ █▓?▓▓██▓█?▓█████?
               ██? ▓█?█? ?▓▓? ▓█ █?  ?█?
               ▓█ ?█▓ ? █? ?█ █▓
                █▓ ██ █? ▓▓ ?█▓▓▓?█?
                 █▓ ?▓██? ▓? ▓█▓?▓█? ?█
                  ██ ▓█▓? ? █?██? ▓▓
                   ▓█? ?█▓  █?█▓?██
                    ?██? ?▓▓? ▓██▓?█? ?▓▓▓▓?█▓
                      ?▓██? ▓? ?█▓█ ?
                          ?▓▓▓▓▓?▓▓▓█?
    
        ______ _ _ _ _____ ____ _ _____ _ _ _ BETA
       | ____| (_) | | / ____|/ __ \| | / ____| (_) | |
       | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
       | __| | | | '_ \| |/ / \___ \| | | | | | | | | | |/ _ \ '_ \| __|
       | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
       |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_| \__|
    
            Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /Users/zhangkai/.flink-sql-history
    
    Flink SQL>
    
  • set checkpoint

    • Reference Document 1
    • Reference Document 2
    # Make a checkpoint every 3 seconds, production suggestion is 5-10 minutes
    Flink SQL> SET execution.checkpointing.interval = 3s;
    
  • Use Flink SQL CLI to create corresponding tables for synchronizing the data of these underlying database tables

# Read the information in mysqlA
CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = 'root',
   'password' = 'secret',
   'database-name' = 'test_db',
   'table-name' = 'orders'
 );
  • Create the dw_orders table to write the associated order data into mysqlB
# Transition table for writing to mysqlB
CREATE TABLE dw_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/test_dw',
   'username' = 'root',
   'password' = 'secret',
   'table-name' = 'orders'
 );
  • Execute writing to mysqlB
INSERT INTO dw_orders SELECT * FROM orders;
  • So far, the additions, deletions and modifications of test_db.orders will be synchronized to test_dw.orders

  • in general

    mysql-cdc reads the information in mysqlA, and jdbc overwrites the information in mysqlB.