Introduction to use cases
Apache Paimon (hereinafter referred to as Paimon) is a high-performance lake storage that supports real-time updates. This use case demonstrates the use of full + incremental integration to synchronize MySQL on a scale of tens of millions of data The capability of order table to Paimon detailed table, downstream calculation aggregation and continuous consumption update. The overall process is shown in the figure below, where MySQL needs to be prepared in advance, and the local machine needs to download the Flink package and Paimon related dependencies, TPC-H data generator.
The data source is generated by TPC-H toolkit and imported into MySQL. When writing to Paimon, the l_shipdate
field is used as the business time to define partitions l_year
and l_month
, the time span is from 1992.1 to 1998.12, dynamically write to 84 partitions, the detailed configuration is shown in the table below.
Configuration | Value | Description |
---|---|---|
Number of records | 59,986,052 | Amount of synchronous data in the full phase |
Number of dynamic write partitions | 84 | `l_year` is the first level partition, `l_month` is the second level partition |
Checkpoint Interval | 1m | Data update frequency |
Parallelism | 2 | Single machine 2 concurrent |
FTS Bucket Number | 2 | Generate 2 buckets under each partition |
After testing, under the configuration of single-machine concurrency of 2 and checkpoint interval of 1 minute (updated every minute), 59.9 million full data can be written within 66 minutes. The write performance per 10 minutes is shown in the table below. The average write performance is in 1 million/min.
Duration(min) | Records In (million) |
---|---|
10 | 10 |
20 | 20 |
30 | 31 |
40 | 42 |
50 | 50 |
60 | 57 |
From the job running graph, it can be observed that the amount of data written by the job every 10 minutes is fully synchronized at 66 minutes, and continuous monitoring of incremental data begins.
About data generation
As a classic Ad-hoc query performance test benchmark, TPC-H contains data models that are very similar to real business scenarios. This use case selects the order detail table lineitem
and its single-table query Q1 (details will be described below)
The lineitem
schema is shown in the table below, and each line is recorded at about 128 bytes
Field | Type | Description |
---|---|---|
l_orderkey | INT NOT NULL | The main order key, that is, the main order id, the first joint primary key |
l_partkey | INT NOT NULL | Accessory key, that is, product id |
l_suppkey | INT NOT NULL | supplier key, which is the seller id |
l_linenumber | INT NOT NULL | sub-order key, which is the sub-order id, The second position of the joint primary key |
l_quantity | DECIMAL(15, 2) NOT NULL | Product Quantity |
l_extendedprice | DECIMAL(15, 2) NOT NULL | commodity price |
l_discount | DECIMAL(15, 2) NOT NULL | Product discount |
l_tax | DECIMAL(15, 2) NOT NULL | Commodity tax |
l_returnflag | CHAR(1) NOT NULL | Order sign, A stands for accepted sign, R |
l_linestatus | CHAR(1) NOT NULL | Sub order status, orders with delivery date later than 1995-06-17 are marked as O , otherwise marked as F |
l_shipdate | DATE NOT NULL | order Ship date |
l_commitdate | DATE NOT NULL | Order commit date |
l_receiptdate | DATE NOT NULL | Receipt date |
l_shipinstruct | CHAR(25) NOT NULL | receipt information, such as DELIVER IN PERSON sign for receipt, TAKE BACK RETURN return, COLLECT COD cash on delivery |
l_shipmode | CHAR(10) NOT NULL | Express mode, with SHIP shipping,AIR by air, TRUCK by land, MAIL by post, etc. |
l_comment | VARCHAR(44) NOT NULL | Comment on order |
Business requirements (TPC-H Q1)
For orders with a delivery date within a certain range, count the number of orders, number of commodities, total turnover, total profit, average ex-factory price, average discounted price, and average discounted price including tax according to the order status and receipt status.
Quick start
Introduction to steps
MySQL environment preparation:
DROP DATABASE IF EXISTS flink; CREATE DATABASE IF NOT EXISTS flink; USE flink; CREATE USER 'flink' IDENTIFIED WITH mysql_native_password BY 'flink'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'; FLUSH PRIVILEGES; --Create Base Table CREATE TABLE lineitem ( l_orderkey INTEGER NOT NULL, l_partkey INTEGER NOT NULL, l_suppkey INTEGER NOT NULL, l_linenumber INTEGER NOT NULL, l_quantity DECIMAL(15,2) NOT NULL, l_extendedprice DECIMAL(15,2) NOT NULL, l_discount DECIMAL(15,2) NOT NULL, l_tax DECIMAL(15,2) NOT NULL, l_returnflag CHAR(1) NOT NULL, l_linestatus CHAR(1) NOT NULL, l_shipdate DATE NOT NULL, l_commitdate DATE NOT NULL, l_receiptdate DATE NOT NULL, l_shipinstruct CHAR(25) NOT NULL, l_shipmode CHAR(10) NOT NULL, l_comment VARCHAR(44) NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Add PK Constraint ALTER TABLE lineitem ADD PRIMARY KEY (l_orderkey, l_linenumber); --Create Delta Table CREATE TABLE update_lineitem LIKE lineitem; CREATE TABLE delete_lineitem ( l_orderkey INTEGER NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8; ALTER TABLE delete_lineitem ADD PRIMARY KEY (l_orderkey);
Prepare TPC-H production files: add tpch_dbgen_linux64-2.14.0.zip to the server, decompress and execute commands, and produce 10G data, of which lineitem is about 7.3G
./dbgen -q -s 10 -T L
load data to mysql
Add the mycreds.cnf file, which contains the basic information of mysql
mysql --defaults-extra-file=mycreds.cnf -D flink --local-infile=1 -e " LOAD DATA LOCAL INFILE 'lineitem.tbl' INTO TABLE lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\\ ';"
This use case will import the full amount of order data (about 59.9 million) into the MySQL container in the first step, which is expected to take 15 minutes. During this period, you can prepare the Flink and Paimon environments and wait for the data to be imported Finished, and then start the Flink job. Then use the script to continuously trigger TPC-H to generate RF1 (new order) and RF2 (delete existing order) to simulate incremental updates (the interval between each group of new additions and deletions is 20s). Taking 10 sets of updates as an example, 6 million new orders and 1.5 million deleted orders will be generated (Note: The deleted orders generated by TPC-H are the main order IDs. Since lineitem
has a joint primary key, the actual The amount of deleted data is slightly larger than 1.5 million).
Step 2: Download Flink, Paimon and other required dependencies
Demo runs using Flink 1.17.0 version ( flink-1.17.0 ), and other dependencies required are as follows
- Flink MySQL CDC connector
- FTS (Paimon Jar File) compiled based on Flink 1.15
- Hadoop Bundle Jar
For the convenience of operation, you can directly download all dependencies in the flink-table-store-101/flink/lib
directory of this project and place them locally in flink-1.14.5/lib
code> directory, you can also download and compile by yourself
- flink-sql-connector-mysql-cdc-2.2.1.jar
- Hadoop Bundle Jar
After the above steps are completed, the lib directory structure is shown in the figure
lib -rw-r--r-- 1 hadoop hadoop 196487 Mar 17 20:07 flink-cep-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 542616 Mar 17 20:10 flink-connector-files-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 102468 Mar 17 20:14 flink-csv-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 135969953 Mar 17 20:22 flink-dist-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 180243 Mar 17 20:13 flink-json-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 21043313 Mar 17 20:20 flink-scala_2.12-1.17.0.jar -rw-rw-r-- 1 hadoop hadoop 36327707 May 19 15:12 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar -rw-rw-r-- 1 hadoop hadoop 22096298 May 22 10:30 flink-sql-connector-mysql-cdc-2.2.1.jar -rw-r--r-- 1 hadoop hadoop 15407474 Mar 17 20:21 flink-table-api-java-uber-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 37975208 Mar 17 20:15 flink-table-planner-loader-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 3146205 Mar 17 20:07 flink-table-runtime-1.17.0.jar -rw-r--r-- 1 hadoop hadoop 208006 Mar 17 17:31 log4j-1.2-api-2.17.1.jar -rw-r--r-- 1 hadoop hadoop 301872 Mar 17 17:31 log4j-api-2.17.1.jar -rw-r--r-- 1 hadoop hadoop 1790452 Mar 17 17:31 log4j-core-2.17.1.jar -rw-r--r-- 1 hadoop hadoop 24279 Mar 17 17:31 log4j-slf4j-impl-2.17.1.jar -rw-rw-r-- 1 hadoop hadoop 26745559 May 19 15:14 paimon-flink-1.17-0.5-20230512.001824-7.jar
Step 3: Modify the flink-conf configuration file and start the cluster
vim flink-1.17.0/conf/flink-conf.yaml
file, modify as follows
jobmanager.memory.process.size: 4096m taskmanager.memory.process.size: 4096m taskmanager.numberOfTaskSlots: 8 parallelism.default: 2 execution.checkpointing.interval: 1min state.backend: rocksdb state.backend.incremental: true execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
If you want to observe Paimon‘s asynchronous merge, Snapshot submission and stream reading information, you can modify the log4j.properties file in the flink-1.14.5/conf
directory and add as needed Configure as follows
# Log FTS logger.commit.name = org.apache.flink.table.store.file.operation.FileStoreCommitImpl logger.commit.level = DEBUG logger.compaction.name = org.apache.flink.table.store.file.mergetree.compact logger.compaction.level = DEBUG logger.enumerator.name = org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumerator logger.enumerator.level=DEBUG
Here we only enable the submitted DEBUG, and then execute ./bin/start-cluster.sh
in the flink-1.17.0
directory
Step 4: Initialize the table schema and start Flink SQL CLI
./bin/sql-client.sh
Execute SQL
-- set to use streaming mode SET 'execution.runtime-mode' = 'streaming'; -- Create and use FTS Catalog CREATE CATALOG `table_store` WITH ( 'type' = 'table-store', 'warehouse' = 'hdfs://tmp/table-store-101' ); USE CATALOG `table_store`; -- ODS table schema -- Note that under FTS Catalog, when creating a table using other connectors, you need to declare the table as a temporary table CREATE TEMPORARY TABLE `ods_lineitem` ( `l_orderkey` INT NOT NULL, `l_partkey` INT NOT NULL, `l_suppkey` INT NOT NULL, `l_linenumber` INT NOT NULL, `l_quantity` DECIMAL(15, 2) NOT NULL, `l_extendedprice` DECIMAL(15, 2) NOT NULL, `l_discount` DECIMAL(15, 2) NOT NULL, `l_tax` DECIMAL(15, 2) NOT NULL, `l_returnflag` CHAR(1) NOT NULL, `l_linestatus` CHAR(1) NOT NULL, `l_shipdate` DATE NOT NULL, `l_commitdate` DATE NOT NULL, `l_receiptdate` DATE NOT NULL, `l_shipinstruct` CHAR(25) NOT NULL, `l_shipmode` CHAR(10) NOT NULL, `l_comment` VARCHAR(44) NOT NULL, PRIMARY KEY (`l_orderkey`, `l_linenumber`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', -- If you want to use host, you can modify the host /etc/hosts to add 127.0.0.1 mysql.docker.internal 'port' = '3306', 'username' = 'flink', 'password' = 'flink', 'database-name' = 'flink', 'table-name' = 'lineitem' ); -- DWD table schema -- Use `l_shipdate` as the business date, create a secondary partition table with `l_year` + `l_month`, note that all partition keys need to be declared in the primary key CREATE TABLE IF NOT EXISTS `dwd_lineitem` ( `l_orderkey` INT NOT NULL, `l_partkey` INT NOT NULL, `l_suppkey` INT NOT NULL, `l_linenumber` INT NOT NULL, `l_quantity` DECIMAL(15, 2) NOT NULL, `l_extendedprice` DECIMAL(15, 2) NOT NULL, `l_discount` DECIMAL(15, 2) NOT NULL, `l_tax` DECIMAL(15, 2) NOT NULL, `l_returnflag` CHAR(1) NOT NULL, `l_linestatus` CHAR(1) NOT NULL, `l_shipdate` DATE NOT NULL, `l_commitdate` DATE NOT NULL, `l_receiptdate` DATE NOT NULL, `l_shipinstruct` CHAR(25) NOT NULL, `l_shipmode` CHAR(10) NOT NULL, `l_comment` VARCHAR(44) NOT NULL, `l_year` BIGINT NOT NULL, `l_month` BIGINT NOT NULL, PRIMARY KEY (`l_orderkey`, `l_linenumber`, `l_year`, `l_month`) NOT ENFORCED ) PARTITIONED BY (`l_year`, `l_month`) WITH ( -- Set 2 buckets under each partition 'bucket' = '2', -- Set the changelog-producer to 'input', which will make the upstream CDC Source not discard update_before, and there will be no changelog-normalize node when the downstream consumes dwd_lineitem 'changelog-producer' = 'input' ); -- ADS table schema -- Based on TPC-H Q1, for shipped orders, count the number of orders, number of products, total turnover, total profit, average ex-factory price, average discounted price, average discounted price including tax, etc. according to the order status and receipt status index CREATE TABLE IF NOT EXISTS `ads_pricing_summary` ( `l_returnflag` CHAR(1) NOT NULL, `l_linestatus` CHAR(1) NOT NULL, `sum_quantity` DOUBLE NOT NULL, `sum_base_price` DOUBLE NOT NULL, `sum_discount_price` DOUBLE NOT NULL, `sum_charge_vat_inclusive` DOUBLE NOT NULL, `avg_quantity` DOUBLE NOT NULL, `avg_base_price` DOUBLE NOT NULL, `avg_discount` DOUBLE NOT NULL, `count_order` BIGINT NOT NULL, PRIMARY KEY (`l_returnflag`, `l_linestatus`) NOT ENFORCED ) WITH ( 'bucket' = '2', 'merge-engine'='partial-update', 'changelog-producer'='full-compaction' );
Step 5: Submit the synchronization task
After the full amount of data is imported into the MySQL lineitem
table, we start the full amount of synchronization job. Here, the result table is used as the job name for easy identification
Task 1: Synchronize ods_lineitem
to dwd_lineitem
via Flink MySQL CDC
-- set job name SET 'pipeline.name' = 'dwd_lineitem'; INSERT INTO dwd_lineitem SELECT `l_orderkey`, `l_partkey`, `l_suppkey`, `l_linenumber`, `l_quantity`, `l_extendedprice`, `l_discount`, `l_tax`, `l_returnflag`, `l_linestatus`, `l_shipdate`, `l_commitdate`, `l_receiptdate`, `l_shipinstruct`, `l_shipmode`, `l_comment`, YEAR(`l_shipdate`) AS `l_year`, MONTH(`l_shipdate`) AS `l_month` FROM `ods_lineitem`;
You can observe the rps, checkpoint and other information in the full synchronization phase in the Flink Web UI, or you can switch to the /tmp/table-store-101/default.db/dwd_lineitem
directory of hdfs to view the generated snpashot , manifest and sst files.
Step 6: Calculate the aggregation index and query the result
After the full synchronization is complete, we start the aggregation job and write it to the ads table in real time (Note: If you need to display the aggregation results even when the historical full data is incomplete, you don’t need to wait for the full synchronization to complete)
Task 2: Write the results table ads_pricing_summary
-- set job name SET 'pipeline.name' = 'ads_pricing_summary'; INSERT INTO `ads_pricing_summary` SELECT `l_returnflag`, `l_linestatus`, SUM(`l_quantity`) AS `sum_quantity`, SUM(`l_extendedprice`) AS `sum_base_price`, SUM(`l_extendedprice` * (1-`l_discount`)) AS `sum_discount_price`, -- aka revenue SUM(`l_extendedprice` * (1-`l_discount`) * (1 + `l_tax`)) AS `sum_charge_vat_inclusive`, AVG(`l_quantity`) AS `avg_quantity`, AVG(`l_extendedprice`) AS `avg_base_price`, AVG(`l_discount`) AS `avg_discount`, COUNT(*) AS `count_order` FROM `dwd_lineitem` WHERE (`l_year` < 1998 OR (`l_year` = 1998 AND `l_month`<= 9)) AND `l_shipdate` <= DATE '1998-12-01' - INTERVAL '90' DAY GROUP BY `l_returnflag`, `l_linestatus`;
We switch to batch mode and switch the result display to tableau
mode
SET 'execution.runtime-mode' = 'batch'; SET 'sql-client.execution.result-mode' = 'tableau';
Then query the aggregated results just now, and you can run it several times to observe the changes in the indicators (the query interval should be greater than the checkpoint interval of the upstream table checked)
SET 'pipeline.name' = 'Pricing Summary'; SELECT * FROM ads_pricing_summary;
In addition to querying aggregated indicators, FTS also supports querying detailed data. Suppose we find that there is a problem with the index of the sub-order returned in December 1998. If we want to further investigate through the order details, we can perform the following query in batch mode
SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETURN';
Use spark to query data
Download spark-3.1.2-bin-hadoop2.7
Put paimon-spark-3.1-0.5-SNAPSHOT.jar in jars
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.warehouse=hdfs://tmp/table-store-101
USE paimon.default; select count(*) from dwd_lineitem; SELECT `l_orderkey`, `l_returnflag`, `l_linestatus`, `l_shipdate` FROM `dwd_lineitem` WHERE `l_year` = 1998 AND `l_month` = 12 AND `l_linenumber` = 2 AND `l_shipinstruct` = 'TAKE BACK RETUR N' ;
Step 7: Observing update data
generate update data
write script
start_pair=1 total_pair=10 MYSQL_DATABASE=gmall for i in `seq ${start_pair} ${total_pair}`; do if [[ `expr ${i} % 10` -eq 0 ]]; then echo "$(date + "%Y-%m-%d %H:%M:%S") Start to apply Old Sales Refresh Function (RF2) for pair ${i}" the fi # This refresh function removes old sales information from the database. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE delete_lineitem" mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e " LOAD DATA LOCAL INFILE 'delete.${i}' INTO TABLE delete_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\\ ';" mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e " BEGIN; DELETE FROM lineitem WHERE l_orderkey IN (SELECT l_orderkey FROM delete_lineitem); COMMIT;" sleep 20s if [[ `expr ${i} % 10` -eq 0 ]]; then echo "$(date + "%Y-%m-%d %H:%M:%S") Start to apply New Sales Refresh Function (RF1) for pair ${i}" the fi # This refresh function adds new sales information to the database. mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e "TRUNCATE update_lineitem" mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} --local-infile=1 -e "SET UNIQUE_CHECKS = 0;" -e " LOAD DATA LOCAL INFILE 'lineitem.tbl.u${i}' INTO TABLE update_lineitem FIELDS TERMINATED BY '|' LINES TERMINATED BY '|\\ ';" mysql --defaults-extra-file=mycreds.cnf -D ${MYSQL_DATABASE} -e " BEGIN; INSERT INTO line items SELECT * FROM update_lineitem; COMMIT;" done start_pair=`expr ${total_pair} + 1` total_pair=`expr ${total_pair} \* 2` sleep 2m
refer to
Full-incremental integrated real-time lake access based on Apache Flink Table Store – Programmer Sought
Apache Flink Table Store Full Increment Integrated CDC Real-time Lake Access