Based on apache paimon real-time data warehouse and full incremental integration into the lake in real time

Introduction to use cases

Apache Paimon (hereinafter referred to as Paimon) is a high-performance lake storage that supports real-time updates. This use case demonstrates the use of full + incremental integration to synchronize MySQL on a scale of tens of millions of data The capability of order table to Paimon detailed table, downstream calculation aggregation and continuous consumption update. The overall process is shown in the figure below, where MySQL needs to be prepared in advance, and the local machine needs to download the Flink package and Paimon related dependencies, TPC-H data generator.

The data source is generated by TPC-H toolkit and imported into MySQL. When writing to Paimon, the l_shipdate field is used as the business time to define partitions l_year and l_month, the time span is from 1992.1 to 1998.12, dynamically write to 84 partitions, the detailed configuration is shown in the table below.

Configuration Value Description
Number of records 59,986,052 Amount of synchronous data in the full phase
Number of dynamic write partitions 84 `l_year` is the first level partition, `l_month` is the second level partition
Checkpoint Interval 1m Data update frequency
Parallelism 2 Single machine 2 concurrent
FTS Bucket Number 2 Generate 2 buckets under each partition

After testing, under the configuration of single-machine concurrency of 2 and checkpoint interval of 1 minute (updated every minute), 59.9 million full data can be written within 66 minutes. The write performance per 10 minutes is shown in the table below. The average write performance is in 1 million/min.

Duration(min) Records In (million)
10 10
20 20
30 31
40 42
50 50
60 57

From the job running graph, it can be observed that the amount of data written by the job every 10 minutes is fully synchronized at 66 minutes, and continuous monitoring of incremental data begins.

About data generation

As a classic Ad-hoc query performance test benchmark, TPC-H contains data models that are very similar to real business scenarios. This use case selects the order detail table lineitem and its single-table query Q1 (details will be described below)

The lineitem schema is shown in the table below, and each line is recorded at about 128 bytes

Field Type Description
l_orderkey INT NOT NULL The main order key, that is, the main order id, the first joint primary key
l_partkey INT NOT NULL Accessory key, that is, product id
l_suppkey INT NOT NULL supplier key, which is the seller id
l_linenumber INT NOT NULL sub-order key, which is the sub-order id, The second position of the joint primary key
l_quantity DECIMAL(15, 2) NOT NULL Product Quantity
l_extendedprice DECIMAL(15, 2) NOT NULL commodity price
l_discount DECIMAL(15, 2) NOT NULL Product discount
l_tax DECIMAL(15, 2) NOT NULL Commodity tax
l_returnflag CHAR(1) NOT NULL Order sign, A stands for accepted sign, R stands for returned rejection, N stands for none unknown
l_linestatus CHAR(1) NOT NULL Sub order status, orders with delivery date later than 1995-06-17 are marked as O, otherwise marked as F
l_shipdate DATE NOT NULL order Ship date
l_commitdate DATE NOT NULL Order commit date
l_receiptdate DATE NOT NULL Receipt date
l_shipinstruct CHAR(25) NOT NULL receipt information, such as DELIVER IN PERSON sign for receipt, TAKE BACK RETURN return, COLLECT COD cash on delivery
l_shipmode CHAR(10) NOT NULL Express mode, with SHIP shipping,AIR by air, TRUCK by land, MAIL by post, etc.
l_comment VARCHAR(44) NOT NULL Comment on order

Business requirements (TPC-H Q1)

For orders with a delivery date within a certain range, count the number of orders, number of commodities, total turnover, total profit, average ex-factory price, average discounted price, and average discounted price including tax according to the order status and receipt status.

Quick start

Introduction to steps

MySQL environment preparation:

DROP DATABASE IF EXISTS flink;
CREATE DATABASE IF NOT EXISTS flink;
USE flink;
CREATE USER 'flink' IDENTIFIED WITH mysql_native_password BY 'flink';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink';
FLUSH PRIVILEGES;
--Create Base Table
CREATE TABLE lineitem (
        l_orderkey INTEGER NOT NULL,
        l_partkey INTEGER NOT NULL,
        l_suppkey INTEGER NOT NULL,
        l_linenumber INTEGER NOT NULL,
        l_quantity DECIMAL(15,2) NOT NULL,
        l_extendedprice DECIMAL(15,2) NOT NULL,
        l_discount DECIMAL(15,2) NOT NULL,
        l_tax DECIMAL(15,2) NOT NULL,
        l_returnflag CHAR(1) NOT NULL,
        l_linestatus CHAR(1) NOT NULL,
        l_shipdate DATE NOT NULL,
        l_commitdate DATE NOT NULL,
        l_receiptdate DATE NOT NULL,
        l_shipinstruct CHAR(25) NOT NULL,
        l_shipmode CHAR(10) NOT NULL,
        l_comment VARCHAR(44) NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    -- Add PK Constraint
    ALTER TABLE lineitem ADD PRIMARY KEY (l_orderkey, l_linenumber);
    --Create Delta Table
CREATE TABLE update_lineitem LIKE lineitem;
    
CREATE TABLE delete_lineitem (
        l_orderkey INTEGER NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
ALTER TABLE delete_lineitem ADD PRIMARY KEY (l_orderkey);

Prepare TPC-H production files: add tpch_dbgen_linux64-2.14.0.zip to the server, decompress and execute commands, and produce 10G data, of which lineitem is about 7.3G

./dbgen -q -s 10 -T L

load data to mysql

Add the mycreds.cnf file, which contains the basic information of mysql

mysql --defaults-extra-file=mycreds.cnf -D flink --local-infile=1 -e "
  LOAD DATA LOCAL INFILE 'lineitem.tbl' INTO TABLE lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\\
';"

This use case will import the full amount of order data (about 59.9 million) into the MySQL container in the first step, which is expected to take 15 minutes. During this period, you can prepare the Flink and Paimon environments and wait for the data to be imported Finished, and then start the Flink job. Then use the script to continuously trigger TPC-H to generate RF1 (new order) and RF2 (delete existing order) to simulate incremental updates (the interval between each group of new additions and deletions is 20s). Taking 10 sets of updates as an example, 6 million new orders and 1.5 million deleted orders will be generated (Note: The deleted orders generated by TPC-H are the main order IDs. Since lineitem has a joint primary key, the actual The amount of deleted data is slightly larger than 1.5 million).

Step 2: Download Flink, Paimon and other required dependencies

Demo runs using Flink 1.17.0 version ( flink-1.17.0 ), and other dependencies required are as follows

  • Flink MySQL CDC connector
  • FTS (Paimon Jar File) compiled based on Flink 1.15
  • Hadoop Bundle Jar

For the convenience of operation, you can directly download all dependencies in the flink-table-store-101/flink/lib directory of this project and place them locally in flink-1.14.5/lib code> directory, you can also download and compile by yourself

  • flink-sql-connector-mysql-cdc-2.2.1.jar
  • Hadoop Bundle Jar

After the above steps are completed, the lib directory structure is shown in the figure

lib
-rw-r--r-- 1 hadoop hadoop 196487 Mar 17 20:07 flink-cep-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 542616 Mar 17 20:10 flink-connector-files-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 102468 Mar 17 20:14 flink-csv-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 135969953 Mar 17 20:22 flink-dist-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 180243 Mar 17 20:13 flink-json-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 21043313 Mar 17 20:20 flink-scala_2.12-1.17.0.jar
-rw-rw-r-- 1 hadoop hadoop 36327707 May 19 15:12 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
-rw-rw-r-- 1 hadoop hadoop 22096298 May 22 10:30 flink-sql-connector-mysql-cdc-2.2.1.jar
-rw-r--r-- 1 hadoop hadoop 15407474 Mar 17 20:21 flink-table-api-java-uber-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 37975208 Mar 17 20:15 flink-table-planner-loader-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 3146205 Mar 17 20:07 flink-table-runtime-1.17.0.jar
-rw-r--r-- 1 hadoop hadoop 208006 Mar 17 17:31 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 hadoop hadoop 301872 Mar 17 17:31 log4j-api-2.17.1.jar
-rw-r--r-- 1 hadoop hadoop 1790452 Mar 17 17:31 log4j-core-2.17.1.jar
-rw-r--r-- 1 hadoop hadoop 24279 Mar 17 17:31 log4j-slf4j-impl-2.17.1.jar
-rw-rw-r-- 1 hadoop hadoop 26745559 May 19 15:14 paimon-flink-1.17-0.5-20230512.001824-7.jar

Step 3: Modify the flink-conf configuration file and start the cluster

vim flink-1.17.0/conf/flink-conf.yaml file, modify as follows

jobmanager.memory.process.size: 4096m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 2
execution.checkpointing.interval: 1min
state.backend: rocksdb
state.backend.incremental: true
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

If you want to observe Paimon‘s asynchronous merge, Snapshot submission and stream reading information, you can modify the log4j.properties file in the flink-1.14.5/conf directory and add as needed Configure as follows

# Log FTS
logger.commit.name = org.apache.flink.table.store.file.operation.FileStoreCommitImpl
logger.commit.level = DEBUG

logger.compaction.name = org.apache.flink.table.store.file.mergetree.compact
logger.compaction.level = DEBUG

logger.enumerator.name = org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumerator
logger.enumerator.level=DEBUG

Here we only enable the submitted DEBUG, and then execute ./bin/start-cluster.sh in the flink-1.17.0 directory

Step 4: Initialize the table schema and start Flink SQL CLI

./bin/sql-client.sh
Execute SQL
-- set to use streaming mode
SET 'execution.runtime-mode' = 'streaming';

-- Create and use FTS Catalog
CREATE CATALOG `table_store` WITH (
    'type' = 'table-store',
    'warehouse' = 'hdfs://tmp/table-store-101'
);

USE CATALOG `table_store`;

-- ODS table schema

-- Note that under FTS Catalog, when creating a table using other connectors, you need to declare the table as a temporary table
CREATE TEMPORARY TABLE `ods_lineitem` (
  `l_orderkey` INT NOT NULL,
  `l_partkey` INT NOT NULL,
  `l_suppkey` INT NOT NULL,
  `l_linenumber` INT NOT NULL,
  `l_quantity` DECIMAL(15, 2) NOT NULL,
  `l_extendedprice` DECIMAL(15, 2) NOT NULL,
  `l_discount` DECIMAL(15, 2) NOT NULL,
  `l_tax` DECIMAL(15, 2) NOT NULL,
  `l_returnflag` CHAR(1) NOT NULL,
  `l_linestatus` CHAR(1) NOT NULL,
  `l_shipdate` DATE NOT NULL,
  `l_commitdate` DATE NOT NULL,
  `l_receiptdate` DATE NOT NULL,
  `l_shipinstruct` CHAR(25) NOT NULL,
  `l_shipmode` CHAR(10) NOT NULL,
  `l_comment` VARCHAR(44) NOT NULL,
  PRIMARY KEY (`l_orderkey`, `l_linenumber`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1', -- If you want to use host, you can modify the host /etc/hosts to add 127.0.0.1 mysql.docker.internal
  'port' = '3306',
  'username' = 'flink',
  'password' = 'flink',
  'database-name' = 'flink',
  'table-name' = 'lineitem'
);


-- DWD table schema
-- Use `l_shipdate` as the business date, create a secondary partition table with `l_year` + `l_month`, note that all partition keys need to be declared in the primary key
CREATE TABLE IF NOT EXISTS `dwd_lineitem` (
  `l_orderkey` INT NOT NULL,
  `l_partkey` INT NOT NULL,
  `l_suppkey` INT NOT NULL,
  `l_linenumber` INT NOT NULL,
  `l_quantity` DECIMAL(15, 2) NOT NULL,
  `l_extendedprice` DECIMAL(15, 2) NOT NULL,
  `l_discount` DECIMAL(15, 2) NOT NULL,
  `l_tax` DECIMAL(15, 2) NOT NULL,
  `l_returnflag` CHAR(1) NOT NULL,
  `l_linestatus` CHAR(1) NOT NULL,
  `l_shipdate` DATE NOT NULL,
  `l_commitdate` DATE NOT NULL,
  `l_receiptdate` DATE NOT NULL,
  `l_shipinstruct` CHAR(25) NOT NULL,
  `l_shipmode` CHAR(10) NOT NULL,
  `l_comment` VARCHAR(44) NOT NULL,
  `l_year` BIGINT NOT NULL,
  `l_month` BIGINT NOT NULL,
  PRIMARY KEY (`l_orderkey`, `l_linenumber`, `l_year`, `l_month`) NOT ENFORCED
) PARTITIONED BY (`l_year`, `l_month`) WITH (
  -- Set 2 buckets under each partition
  'bucket' = '2',
  -- Set the changelog-producer to 'input', which will make the upstream CDC Source not discard update_before, and there will be no changelog-normalize node when the downstream consumes dwd_lineitem
  'changelog-producer' = 'input'
);

-- ADS table schema
-- Based on TPC-H Q1, for shipped orders, count the number of orders, number of products, total turnover, total profit, average ex-factory price, average discounted price, average discounted price including tax, etc. according to the order status and receipt status index
CREATE TABLE IF NOT EXISTS `ads_pricing_summary` (
  `l_returnflag` CHAR(1) NOT NULL,
  `l_linestatus` CHAR(1) NOT NULL,
  `sum_quantity` DOUBLE NOT NULL,
  `sum_base_price` DOUBLE NOT NULL,
  `sum_discount_price` DOUBLE NOT NULL,
  `sum_charge_vat_inclusive` DOUBLE NOT NULL,
  `avg_quantity` DOUBLE NOT NULL,
  `avg_base_price` DOUBLE NOT NULL,
  `avg_discount` DOUBLE NOT NULL,
  `count_order` BIGINT NOT NULL,
   PRIMARY KEY (`l_returnflag`, `l_linestatus`) NOT ENFORCED
) WITH (
  'bucket' = '2',
  'merge-engine'='partial-update',
  'changelog-producer'='full-compaction'
);

Step 5: Submit the synchronization task

After the full amount of data is imported into the MySQL lineitem table, we start the full amount of synchronization job. Here, the result table is used as the job name for easy identification

Task 1: Synchronize ods_lineitem to dwd_lineitem via Flink MySQL CDC

-- set job name
SET 'pipeline.name' = 'dwd_lineitem';
INSERT INTO dwd_lineitem
SELECT
  `l_orderkey`,
  `l_partkey`,
  `l_suppkey`,
  `l_linenumber`,
  `l_quantity`,
  `l_extendedprice`,
  `l_discount`,
  `l_tax`,
  `l_returnflag`,
  `l_linestatus`,
  `l_shipdate`,
  `l_commitdate`,
  `l_receiptdate`,
  `l_shipinstruct`,
  `l_shipmode`,
  `l_comment`,
  YEAR(`l_shipdate`) AS `l_year`,
  MONTH(`l_shipdate`) AS `l_month`
FROM `ods_lineitem`;

You can observe the rps, checkpoint and other information in the full synchronization phase in the Flink Web UI, or you can switch to the /tmp/table-store-101/default.db/dwd_lineitem directory of hdfs to view the generated snpashot , manifest and sst files.

Step 6: Calculate the aggregation index and query the result

After the full synchronization is complete, we start the aggregation job and write it to the ads table in real time (Note: If you need to display the aggregation results even when the historical full data is incomplete, you don’t need to wait for the full synchronization to complete)

Task 2: Write the results table ads_pricing_summary

-- set job name
SET 'pipeline.name' = 'ads_pricing_summary';
INSERT INTO `ads_pricing_summary`
SELECT
  `l_returnflag`,
  `l_linestatus`,
  SUM(`l_quantity`) AS `sum_quantity`,
  SUM(`l_extendedprice`) AS `sum_base_price`,
  SUM(`l_extendedprice` * (1-`l_discount`)) AS `sum_discount_price`, -- aka revenue
  SUM(`l_extendedprice` * (1-`l_discount`) * (1 + `l_tax`)) AS `sum_charge_vat_inclusive`,
  AVG(`l_quantity`) AS `avg_quantity`,
  AVG(`l_extendedprice`) AS `avg_base_price`,
  AVG(`l_discount`) AS `avg_discount`,
  COUNT(*) AS `count_order`
FROM `dwd_lineitem`
WHERE (`l_year` < 1998 OR (`l_year` = 1998 AND `l_month`<= 9))
AND `l_shipdate` <= DATE '1998-12-01' - INTERVAL '90' DAY
GROUP BY
  `l_returnflag`,
  `l_linestatus`;

We switch to batch mode and switch the result display to tableau mode

SET 'execution.runtime-mode' = 'batch';

SET 'sql-client.execution.result-mode' = 'tableau';

Then query the aggregated results just now, and you can run it several times to observe the changes in the indicators (the query interval should be greater than the checkpoint interval of the upstream table checked)

SET 'pipeline.name' = 'Pricing Summary';

SELECT * FROM ads_pricing_summary;

In addition to querying aggregated indicators, FTS also supports querying detailed data. Suppose we find that there is a problem with the index of the sub-order returned in December 1998. If we want to further investigate through the order details, we can perform the following query in batch mode

SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETURN';

Use spark to query data

Download spark-3.1.2-bin-hadoop2.7

Put paimon-spark-3.1-0.5-SNAPSHOT.jar in jars

spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.warehouse=hdfs://tmp/table-store-101 
USE paimon.default;
select count(*) from dwd_lineitem;
 SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETUR N' ;

Step 7: Observing update data

generate update data

write script

start_pair=1
total_pair=10
MYSQL_DATABASE=gmall
for i in `seq ${start_pair} ${total_pair}`; do

if [[ `expr ${i} % 10` -eq 0 ]]; then
      echo "$(date + "%Y-%m-%d %H:%M:%S") Start to apply Old Sales Refresh Function (RF2) for pair ${i}"
    the fi
    # This refresh function removes old sales information from the database.
    mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE delete_lineitem"
    mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e "
    LOAD DATA LOCAL INFILE 'delete.${i}' INTO TABLE delete_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\\
';"
    mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "
        BEGIN;
        DELETE FROM lineitem WHERE l_orderkey IN (SELECT l_orderkey FROM delete_lineitem);
        COMMIT;"

 sleep 20s
    if [[ `expr ${i} % 10` -eq 0 ]]; then
      echo "$(date + "%Y-%m-%d %H:%M:%S") Start to apply New Sales Refresh Function (RF1) for pair ${i}"
    the fi
    # This refresh function adds new sales information to the database.
    mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE update_lineitem"
    mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e "
    LOAD DATA LOCAL INFILE 'lineitem.tbl.u${i}' INTO TABLE update_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\\
';"
    mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "
        BEGIN;
        INSERT INTO line items
        SELECT * FROM update_lineitem;
        COMMIT;"
    
    done
    start_pair=`expr ${total_pair} + 1`
    total_pair=`expr ${total_pair} \* 2`

    sleep 2m

refer to

Full-incremental integrated real-time lake access based on Apache Flink Table Store – Programmer Sought

Apache Flink Table Store Full Increment Integrated CDC Real-time Lake Access