This article teaches you to use Apache SeaTunnel Zeta to synchronize data from MySQL to StarRocks offline

Click on the blue word to follow us

18a4304e64cad22f22d5fda666e58845.jpeg

In the previous article, we introduced how to download, install and deploy the SeaTunnel Zeta service (deploy the SeaTunnel Zeta single-node Standalone mode environment in 3 minutes), and then we will introduce the first synchronization scenario supported by SeaTunnel: offline batch synchronization. As the name implies, offline batch synchronization requires the user to define the SeaTunnel JobConfig, select the batch mode, and start synchronizing data after the job starts, and exit when the data synchronization is completed.

The following takes MySQL offline synchronization to StarRocks as an example to introduce how to use SeaTunnel to define and run offline synchronization jobs.

01

Define job profiles

SeaTunnel uses configuration files to define jobs. In this example, the job configuration files are as follows, and the file save path is ~/seatunnel/apache-seatunnel-incubating-2.3.1/config/mysql_to_sr.config

#Define the running parameters of some jobs, for details, please refer to https://seatunnel.apache.org/docs/2.3.1/concept/JobEnvConfig
env {
    job.mode="BATCH" #The operation mode of the job, BATCH=offline batch synchronization, STREAMING=real-time synchronization
    job.name="SeaTunnel_Job"
    checkpoint.interval=10000 #Checkpoint is performed every 10000ms, and the impact of checkpoint on the two connectors JDBC Source and StarRocks Sink will be introduced in detail later
}
source {
    Jdbc {
        parallelism=5 # Parallelism, here is to start 5 Source Tasks to read data in parallel
        partition_column="id" #Use the id field to split the split. Currently, only digital primary key columns are supported, and the value of this column is best offline, and the auto-increment id is the best
        partition_num="20" # split into 20 splits, and these 20 splits will be assigned to 5 Source Tasks for processing
        result_table_name="Table9210050164000"
        query = "select` ID`, `f_binary`,` f_blob`, `f_long_varbinary`,` f_longblob`, `f_tinyblob`,` f_varbinary`, `f_smallint_unsignign ED`, `f_mediumint`,` f_mediumImint_unsigned`, `f_int `, `f_int_unsigned`, `f_integer`, `f_integer_unsigned`, `f_bigint`, `f_bigint_unsigned`, `f_numeric`, `f_decimal`, `f_float`, `f_double`, `f_double_precision`, `f_longtext`, `f _mediumtext`, `f_text`, `f_tinytext`, `f_varchar`, `f_date`, `f_datetime`, `f_timestamp` FROM `sr_test`.`test1`"
        password="root@123"
        driver="com.mysql.cj.jdbc.Driver"
        user=root
        url="jdbc:mysql://st01:3306/sr_test?enabledTLSProtocols=TLSv1.2 &rewriteBatchedStatements=true"
    }
}
transform {
# In this example, we don't need to do the Transform operation of the task, so it is empty here, and the entire element of transform can also be deleted
}
sink {
    StarRocks {
        batch_max_rows=10240#
        source_table_name="Table9210050164000"
        table="test2"
        database="sr_test"
        base-url="jdbc:mysql://datasource01:9030"
        password="root"
        username="root"
        nodeUrls = [
            "datasource01:8030" #write data through Http interface of StarRocks
        ]
    }
}

02

Job Configuration Instructions

In this job definition file, we define the operation mode of the job as BATCH offline batch processing mode through env, and define the name of the job as “SeaTunnel_Job”. The checkpoint.interval parameter is used to define how often a checkpoint is performed during the job. What is a checkpoint and what is the role of the checkpoint in Apache SeaTunnel?

Checkpoint

Check out the introduction to Apache SeaTunnel Zeta engine checkpoint in the official document: https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage#introduction and found that checkpoint is used to run in Apache SeaTunnel Zeta Jobs can periodically save their status in the form of snapshots. When a task fails unexpectedly, the job can be restored from the latest saved snapshot to achieve task failure recovery, breakpoint resume and other functions. In fact, the core of checkpoint is the distributed snapshot algorithm: Chandy-Lamport algorithm, which is widely used in distributed systems, and is more of a theoretical basis for fault-tolerant processing in distributed computing systems. The Chandy-Lamport algorithm will not be introduced in detail here. Next, we will focus on the impact of checkpoint on this synchronization task in this example.

The Apache SeaTunnel Zeta engine will start a thread called CheckpointManager when the job starts to manage the checkpoint of the job. SeaTunnel Connector API provides a set of checkpoint APIs, which are used to notify specific Connectors to perform corresponding processing when the engine triggers a checkpoint. Both the Source and Sink connectors of SeaTunnel are developed based on the SeaTunnel Connector API, but different connectors have different implementation details of the checkpoint API, so the functions they can realize are also different.

  • The impact of Checkpoint on JDBC Source

In this example, we can find the following content through the official documentation of the JDBC Source connector https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/Jdbc:

233c417b17893358ce145fb8dced7269.png

This shows that the JDBC Source connector implements checkpoint-related interfaces. We can know from the source code that when a checkpoint occurs, JDBC Source will send its unprocessed split as a snapshot of the state to CheckpointManager for persistent storage. In this way, when the job fails and resumes, JDBC Source will read which splits have not been processed from the latest saved snapshot, and then process these splits.

Pass partition_num=20 in this job, the result of the sql statement specified in the query parameter will be divided into 20 splits for processing, each split will generate sql for reading the data it is responsible for, this sql is the sql specified in the query Plus some where filter conditions. These 20 splits will be assigned to 5 Source Tasks for processing. Ideally, each Source Task will be assigned to 4 splits. Assuming that there is only one split left unprocessed in each Source Task during a checkpoint, the information of this split will be saved. If the job hangs up after that, the job will be automatically restored, and each Source Task will get the The split that has not been processed, and then processed. If the job no longer reports an error, after these splits are processed, the job is finished. If the job still reports an error (for example, the target StarRocks is down and data cannot be written), the job will end in a failure state.

Breakpoint resume:

If after the job fails, we fix the problem and want the job to run as it was before, and only process the splits that have not been processed before, we can use sh seatunnel.sh -r jobId to make the job whose job ID is jobId start from Resume from breakpoint.

Back to the topic, checkpoint.interval=10000 means that for reading data from Mysql, the SeaTunnel Zeta engine will trigger a checkpoint operation every 10s, and then the JDBC Source Task will be required to save the unprocessed split information. It should be noted here that the JDBC Source Task reads data in units of splits. If the data in a split is being read when the checkpoint is triggered and has not been completely sent to the downstream StarRocks, it will wait until the data processing of the split is completed. Only then will the checkpoint operation be responded to. It must be noted here that if the amount of data in MySQL is relatively large, it takes a long time for the data of a split to be processed, which may cause checkpoint timeout. The timeout period of checkpoint can be parameterized in https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage, and the default is 1 minute.

  • 2.1.2 Impact of checkpoint on StarRocks Sink

On the documentation of the Sink connector, we can also see the logo in the following figure:

2679739db6ca256f3e64465368a28163.png

This flag represents whether the Sink connector has realized the semantics of exactly one processing. If this flag is selected, it means that the Sink connector can guarantee that the data sent to it will only be written to the target end once, and will not miss the target. The end data is lost, and it will not be repeatedly written to the target end. The common way to implement this function is two-phase commit. Connectors that support transactions generally start transactions first to write data. When checkpoint occurs, the transaction ID is returned to CheckManager for persistence. When all tasks in the job respond to the checkpoint request of CheckManager, the first phase is completed. Then the Apache SeaTunnel Zeta engine will call the AggregateCommit method to let the Sink commit its transactions. This process is called the second phase. After the second phase is completed, the checkpoint is completed. If the second phase commit fails, the job will fail and then automatically recover. After recovery, it will start from the second phase again, requiring the transaction to be committed until the transaction is committed. If the transaction fails all the time, the job will also fail.

Not only the Sink connector that implements the exactly-once feature can ensure that the data at the target end is not lost or repeated. If the database at the target end supports deduplication by primary key, then as long as the Sink connector guarantees that the data sent to it is at least written to the target end Once, no matter how many times the write is repeated, it will not eventually cause data loss or duplication on the destination side. In this example, the StarRocks Sink connector uses this method. The StarRocks Sink connector will first cache the received data in memory, and when the number of cached rows reaches 10240 rows set by batch_max_rows, it will initiate a write Request to write data into StarRocks. If the amount of data in MySQL is small and does not reach 10240 rows, StarRocks will be written when the checkpoint is triggered.

03

run job

We use the Apache SeaTunnel Zeta engine to run the job

cd ~/seatunnel/apache-seatunnel-incubating-2.3.1
sh bin/seatunnel.sh --config config/mysql_to_sr.config

After the job is completed, you can see the following information, indicating that the job status is FINISHED, 20w rows of data are read, and 20w rows of data are written to StarRocks, which takes 6s.

e37545a34c0f01fc1a28c891e2f10c67.png

Apache SeaTunnel

c7fe127297f268f1e506fbf378974bd7.png

Apache SeaTunnel(Incubating) is a distributed, high-performance, easy-to-expand data integration platform for massive data (offline & real-time) synchronization and transformation

Warehouse address:

https://github.com/apache/incubator-seatunnel

URL:

https://seatunnel.apache.org/

Proposal:

https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

Apache SeaTunnel(Incubating) download link:

https://seatunnel.apache.org/download

Sincerely welcome more people to join!

We believe that in 「Community Over Code」 (community is greater than code), 「Open and Cooperation」 (open collaboration), 「Meritocracy」 ( Under the guidance of The Apache Way such as “meritocracy management”) and “Diversity and Consensus Decision-Making”, we will usher in a more diverse and inclusive community ecology, and jointly build technological progress brought about by the spirit of open source!

We sincerely invite all partners who are interested in making local open source global to join the family of SeaTunnel contributors and build open source together!

Submit questions and suggestions:

https://github.com/apache/incubator-seatunnel/issues

Contribute code:

https://github.com/apache/incubator-seatunnel/pulls

Subscribe to the community development mailing list:

[email protected]

Development mailing list:

[email protected]

Join Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

Follow on Twitter:

Activity recommendation

The SeaTunnel Open Source Summer student project application is open, and the maximum bonus for a single project is 12,000 yuan!

Click on the picture for details and to sign up

ca62affa0927564c9f212893297cbb76.png

Click to read the original text and view all projects of SeaTunnel!

16fb7ea657630d2354179bac4e977175.png