Frontier: flink cdc is becoming more and more powerful and supports more data sources. This article introduces the implementation of flink cdc:
sqlserver-》(using flink cdc)-》flink -》(using flink starrocks connector)->starrocks entire process
1. sqlserver environment preparation (you must use sqlserver versions below 16, flink cdc currently only supports sqlserver versions below 16)
I am using the docker environment:
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker images REPOSITORY TAG IMAGE ID CREATED SIZE starrocks.docker.scarf.sh/starrocks/allin1-ubuntu latest 4d3c0066a012 3 days ago 4.71GB mcr.microsoft.com/mssql/server 2019-latest e7fc0b49be3c 4 weeks ago 1.47GB mcr.microsoft.com/mssql/server 2022-latest 683d523cd395 5 weeks ago 2.9GB federatedai/standalone_fate latest 6019ec787699 9 months ago 5.29GB milvusdb/milvus v2.1.4 d9a5c977c414 11 months ago 711MB starrocks/dev-env main 8f4edba3b115 16 months ago 7.65GB minio/minio RELEASE.2022-03-17T06-34-49Z 239acc52a73a 17 months ago 228MB kicbase/stable v0.0.29 64d09634c60d 20 months ago 1.14GB quay.io/coreos/etcd v3.5.0 a7908fd5fb88 2 years ago 110MB
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest
docker exec -it --user root sql_server_2019 bash
Turn on the agent, restart the sqlserver environment, and connect:
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash root@99e196828047:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true SQL Server needs to be restarted in order to apply this setting. Please run 'systemctl restart mssql-server.service'. root@99e196828047:/# exit exit xiuchenggong@xiuchengdeMacBook-Pro ~ % docker restart sql_server_2019 sql_server_2019 xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"
Enable sqlserver cdc function:
root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456" 1> use cdc_test; 2> go Changed database context to 'cdc_test'. 1> EXEC sys.sp_cdc_enable_db; 2> go 1> SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test'; 2> go is_cdc_enabled 1> CREATE TABLE orders (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id])) 2> go 1> 2> 3> EXEC sys.sp_cdc_enable_table 4> @source_schema = 'dbo', 5> @source_name = 'orders', 6> @role_name = 'cdc_role'; 7> go Job 'cdc.cdc_test_capture' started successfully. Job 'cdc.cdc_test_cleanup' started successfully.
Insert some data:
1> select * from orders; 2> go id order_date purchaser quantity product_id --------------------------- ----------- ----------- - ---------- 1 2023-07-07 1 1 1 2 2023-07-07 2 2 2 3 2023-07-07 3 3 3 4 2023-07-07 4 4 4 45 2023-07-07 5 5 5 (5 rows affected) 1> update orders set quantity = 100 where id =1; 2> go (1 rows affected) 1> select * from orders; 2> go id order_date purchaser quantity product_id --------------------------- ----------- ----------- - ---------- 1 2023-07-07 1 100 1 2 2023-07-07 2 2 2 3 2023-07-07 3 3 3 4 2023-07-07 4 4 4 45 2023-07-07 5 5 5 (5 rows affected) 1> update orders set quantity = 200 where id = 2; 2> go
2. Prepare the flink environment:
- Download flink 1.16.2 (official website download)
- Download flink sqlserver cdc 2.2.0 (Central Repository: com/ververica/flink-cdc-connectors)
- Download flink starrocks connector 1.15 (you should also download the corresponding version 1.16.2, but the official version has not been released yet. I tested it with 1.15 and it was ok) Download link: Release Release 1.2.6 · StarRocks/starrocks-connector-for-apache- flink · GitHub
3. Prepare starrocks docker environment:
See link: Deploying StarRocks with Docker @ deploy_with_docker @ StarRocks Docs
4. Start the flink environment (cd {FLINK_HOME}):
xiuchenggong@xiuchengdeMacBook-Pro bin % ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host xiuchengdeMacBook-Pro.local. Starting taskexecutor daemon on host xiuchengdeMacBook-Pro.local. xiuchenggong@xiuchengdeMacBook-Pro bin % ./sql-client.sh embedded SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/flink-1.16.2/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder. class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ?▓██▓██? ▓█████▓?▓███▓? ▓███▓ ?▓██? ? ?██? ▓▓█▓▓ ?████ ██? ▓███? ?█?█? ?▓█ ███ ▓██ ▓█ ?▓██▓?▓▓█ █? █ ? ███▓▓█ ?█? ████? ?▓█▓ ██? ▓███? █▓▓██ ▓█? ▓█?▓██▓ ?█? ▓▓████? ██ ?█ █▓█?█? ███▓?██▓ ▓█ █ █▓ ?▓█▓▓█? ?██▓ ?█? █ █? ?█████▓? ██▓ ███? ? █? ▓ ?█ █████? ?█?▓ ▓? ██▓█ ▓? ▓███████▓? ?█? ?▓ ▓██▓ ?██▓ ▓█ █▓█ █████▓▓ ██ █ ? ▓█? ▓█▓ ▓█ ██▓ ?▓▓▓▓▓▓▓? ?██▓ ?█? ▓█ █ ▓███▓ ?▓▓▓███▓ ? ▓█ ██▓ ██? ▓▓███▓▓▓▓▓██████▓? ▓███ █ ▓███? ███ ?▓▓? ?▓████▓? ▓? █▓ █▓▓▓██ ▓██▓? █▓ ██ ▓█ ▓▓▓▓? ?█▓ ?▓▓██▓ ▓? ▓ ▓█▓ ▓?█ █▓? ▓▓██? ?▓█? ▓█████? ██? ▓█?█? ?▓▓? ▓█ █? ?█? ▓█ ?█▓ ? █? ?█ █▓ █▓ ██ █? ▓▓ ?█▓▓▓?█? █▓ ?▓██? ▓? ▓█▓▓█? ?█ ██ ▓█▓? ?█?██? ▓▓ ▓█? ?█▓ █?█▓██ ?██? ?▓▓? ▓██▓?█? ?▓▓▓▓?█▓ ?▓██? ▓? ?█▓█ ?▓▓▓▓▓?▓▓ ▓█? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /Users/xiuchenggong/.flink-sql-history Flink SQL>
Create sqlsever to flink table:
Flink SQL> CREATE TABLE t_source_sqlserver ( > id INT, > order_date DATE, > purchaser INT, > quantity INT, > product_id INT, > PRIMARY KEY (id) NOT ENFORCED -- primary key definition (optional) > ) WITH ( > 'connector' = 'sqlserver-cdc', -- Use the SQL Server CDC connector > 'hostname' = 'localhost', -- SQL Server host name > 'port' = '30027', -- SQL Server port > 'username' = 'sa', -- SQL Server username > 'password' = 'abc@123456', -- SQL Server password > 'database-name' = 'cdc_test', -- database name > 'schema-name' = 'dbo', -- schema name > 'table-name' = 'orders' -- the name of the table to capture changes > );
Then create the flink to starrocks table:
Flink SQL> > > CREATE TABLE IF NOT EXISTS `orders_sink` ( > id int, > order_date date, > purchaser int, > quantity int, > product_id int, > PRIMARY KEY(`id`) NOT ENFORCED > ) with ( > 'load-url' = 'localhost:8030', > 'sink.buffer-flush.interval-ms' = '15000', > 'sink.properties.row_delimiter' = '\x02', > 'sink.properties.column_separator' = '\x01', > 'connector' = 'starrocks', > 'database-name' = 'test', > 'table-name' = 'orders', > 'jdbc-url' = 'jdbc:mysql://localhost:9030', > 'password' = '', > 'username' = 'root' > ) > ; [INFO] Execute statement succeed.
Flink SQL> show tables; + -------------------- + | table name | + -------------------- + | orders_sink | | t_source_sqlserver | + -------------------- + 2 rows in set
submit homework:
Flink SQL> insert into orders_sink select * from t_source_sqlserver; [INFO] Submitting SQL update statement to the cluster... WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/xiuchenggong/flink/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang .Class.ANNOTATION WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 746cc173cd71133e96d080f25327e9bc
Flink webui sees long-resident jobs:
5. Verify whether the data in sqlserver has been synchronized to starrocks, insert/update/delete:
StarRocks > select * from orders; + ------ + ------------ + ----------- + ---------- + ------ ------+ | id | order_date | purchaser | quantity | product_id | + ------ + ------------ + ----------- + ---------- + ------ ------+ | 1 | 2023-07-07 | 1 | 100 | 1 | | 3 | 2023-07-07 | 3 | 3 | 3 | | 4 | 2023-07-07 | 4 | 4 | 4 | | 45 | 2023-07-07 | 5 | 5 | 5 | | 2 | 2023-07-07 | 2 | 200 | 2 | + ------ + ------------ + ----------- + ---------- + ------ ------+ 5 rows in set (0.016 sec) StarRocks >
All additions, deletions and modifications to data are synchronized;