From 0 to 1, streampark synchronization data solution mysql-doris, jar form (multiple tables)

Server system: CentOS7.2

Prerequisite: jdk1.8, maven, mysql and other basic environments have been installed

1.flink1.16.2

Directly use the stand-alone version

//Unzip
tar -zxvf flink-1.16.2-bin-scala_2.12.tgz
//Enter the bin directory to run the startup script
./start-cluster.sh --daemon
jps
//If there is downloaded content, it proves that the startup is successful
StandaloneSessionClusterEntrypoint
TaskManagerRunner

2. streampark apache-streampark_2.12-2.1.1-incubating-bin

There are two main installation steps:

1. Download files

2. Initialize the table used by streampark

Note: This article does not use Hadoop. If the initialization table uses mysql, you need to download the connection jar and put it into lib. For specific operations, see the official streampark official deployment.

After the deployment is completed, if you use the default port, enter ip:10000 in the browser to directly enter the streampark login interface, with the account name admin and the password streampark.

Switch language here

3. Streampark configuration flink

You can view the specific steps in the streampark official quick start, which mainly involves configuring the flink version and cluster, and connecting them above, but there is no further description.

4. Installation and deploymentdoris apache-doris-2.0.1.1-bin-x64

1 machine, 1 FE and 1 BE

1. Unzip the tar package and create doris-meta and storage at the same time. These two folders will be used for the configuration of FE and BE respectively.

tar -zxvf apache-streampark_2.12-2.1.1-incubating-bin.tar.gz & amp; & amp; mkdir doris-meta & amp; & amp; mkdir storage

2. Modify be.conf and fe.conf in the fe and be-conf folders respectively.

Modify fe.conf
########### Modify the first item meta_dir ###########
meta_dir = the path of doris-meta created in the previous step
 
########### Modify the second item priority_networks to the local ip/24 ###########
 priority_networks = 172.16.10.240/24;
JAVA_HOME=path to native jdk




Modify be.conf
########### Modify the first item priority_networks ###########
storage_root_path = path to the storage created in the previous step
 
########### Modify the second item priority_networks to the local ip/24 ###########
priority_networks = 172.16.10.240/24
 
#Add some additional configurations of BE
# Log configuration
sys_log_dir = ${DORIS_HOME}/log
sys_log_roll_mode = SIZE-MB-1024
sys_log_roll_num = 1
 
# Recycle bin cleaning interval 24h
trash_file_expire_time_sec = 86400

3. Enter fe’s bin and start fe.

./start_fe.sh --daemon

4.View fe

mysql -uroot -P9030 -h127.0.0.1
-- Check the FE running status. If the IsMaster, Join and Alive columns are all true, it means the node is normal.
mysql> show frontends\G;
 
*************************** 1. row ********************* *******
             Name: 172.21.32.5_9010_1660549353220
               IP: 172.21.32.5
      EditLogPort: 9010
         HTTPPort: 8030
        QueryPort: 9030
          RpcPort: 9020
             Role: FOLLOWER
         IsMaster: true
        ClusterId: 1685821635
             Join: true
            Alive: true
ReplayedJournalId: 49292
    LastHeartbeat: 2022-08-17 13:00:45
         IsHelper: true
           ErrMsg:
          Version: 1.1.2-rc03-ca55ac2
 CurrentConnected: Yes
1 row in set (0.03 sec)

5. Enter be’s bin and start be

./start_be.sh --daemon

Some system settings may appear in the middle, just follow the prompts.

[root@localhost bin]# ./start_be.sh --daemon
Please set vm.max_map_count to be 2000000 under root using 'sysctl -w vm.max_map_count=2000000'.
[root@localhost bin]# sysctl -w vm.max_map_count=2000000
vm.max_map_count = 2000000
[root@localhost bin]# ./start_be.sh --daemon
Please set the maximum number of open file descriptors to be 65536 using 'ulimit -n 65536'.
[root@localhost bin]# ulimit -n 65536
[root@localhost bin]# ./start_be.sh --daemon

6. Add be to the node

mysql -uroot -P9030 -h127.0.0.1
 
--Add be to the cluster
-- be_host_ip: This is the IP address of your BE, matching your priority_networks in be.conf
-- heartbeat_service_port: This is your BE's heartbeat reporting port, which matches your heartbeat_service_port in be.conf. The default is 9050.
mysql> ALTER SYSTEM ADD BACKEND "be_host_ip:heartbeat_service_port";
 
-- Check be status Alive: true means the node is running normally
mysql> SHOW BACKENDS\G
*************************** 1. row ********************* *******
            BackendId: 10003
              Cluster: default_cluster
                   IP: 172.21.32.5
        HeartbeatPort: 9050
               BePort: 9060
             HTTPPort: 8040
             BrpcPort: 8060
        LastStartTime: 2022-08-16 15:31:37
        LastHeartbeat: 2022-08-17 13:33:17
                Alive: true
 SystemDecommissioned: false
ClusterDecommissioned: false
            TabletNum: 170
     DataUsedCapacity: 985.787 KB
        AvailCapacity: 782.729 GB
        TotalCapacity: 984.180 GB
              UsedPct: 20.47%
       MaxDiskUsedPct: 20.47%
                  Tag: {"location" : "default"}
               ErrMsg:
              Version: 1.1.2-rc03-ca55ac2
               Status: {"lastSuccessReportTabletsTime":"2022-08-17 13:33:05","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false}

7. Log in to doris

In the browser, enter ip:8030/login account root, no password

You can create tables that need to be synchronized here to prepare for the next synchronization. Or create a table through visualization tools such as datagrip.

The above are various environments that need to be deployed by the server. The following is the code part. Of course, streampark supports flinksql synchronization. The following is synchronization through the flinkcdc jar package

5. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.flink</groupId>
    <artifactId>mysql-doris</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>flink-mysql-doris</name>
    <properties>
        <scala.version>2.12</scala.version>
        <java.version>1.8</java.version>
        <flink.version>1.16.2</flink.version>
        <fastjson.version>1.2.62</fastjson.version>
        <scope.mode>compile</scope.mode>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- Add log dependencies when debugging locally -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- flink-doris-connector -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.16</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>flink-shaded-guava</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-feature</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

6. Code

package com.flink.mysql;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;


public class DatabaseFullSyncV2 {
    private static final Logger LOG = LoggerFactory.getLogger(DatabaseFullSyncV2.class);
    private final static String MYSQL_HOST = "172.18.116.98";
    private final static int MYSQL_PORT = 33061;
    private final static String MYSQL_USER = "rldbuser";
    private final static String MYSQL_PASSWD = "XT7dmBy98KdWuYCe";
    private final static String SYNC_DB = "account";
    private final static String SYNC_TBLS = "account.account,account.account_copy";
    private final static String DORIS_HOST = "172.18.116.211";
    private final static int DORIS_PORT = 8030;
    private final static String DORIS_USER = "root";
    private final static String DORIS_PWD = "";
    private final static String TARGET_DORIS_DB = "account";

    public static void main(String[] args) throws Exception {

        Properties debeziumProperties = new Properties();
        debeziumProperties.setProperty("converters", "dateConverters");
        debeziumProperties.setProperty("dateConverters.type", "com.flink.mysql.DateToStringConverter"); // Date format processing

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(MYSQL_HOST)
                .port(MYSQL_PORT)
                .databaseList(SYNC_DB) // set captured database
                .tableList(SYNC_TBLS) // set captured table
                .username(MYSQL_USER)
                .password(MYSQL_PASSWD)
                .scanNewlyAddedTableEnabled(true)
                .debeziumProperties(debeziumProperties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .includeSchemaChanges(true)
                .startupOptions(StartupOptions.initial())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);//Execute once every 5 seconds, mode: precise one-time
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        //2.2 Set checkpoint timeout
        env.getCheckpointConfig().setCheckpointTimeout(60*1000);
        //2.3 Set restart strategy
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2*1000));//Twice, once every two seconds
        //2.4 Set whether the checkpoint is retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//Reserved

        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC JI");

        //get table list
        List<String> tableList = Arrays.stream(SYNC_TBLS.split(",")).map(a->a.split("\.")[1]).collect(Collectors. toList());
        for(String tbl : tableList){
            SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
            DorisSink<String> dorisSink = buildDorisSink(tbl);
            cleanStream.sinkTo(dorisSink).name("sink " + tbl);
        }
        env.execute("DORIS TABLES Sync JI");
    }

    /**
     * Get real data
     * {
     * "before":null,
     * "after":{
     * "id":1,
     * "name":"zhangsan-1",
     * "age":18
     * },
     * "source":{
     * "db":"test",
     * "table":"test_1",
     *...
     * },
     * "op":"c",
     *...
     * }
     * */
    private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
        return source.flatMap(new FlatMapFunction<String,String>(){
            @Override
            public void flatMap(String row, Collector<String> out) throws Exception {
                try{
                    JSONObject rowJson = JSON.parseObject(row);
                    String op = rowJson.getString("op");
                    if (Arrays.asList("c", "r", "u").contains(op)) {
                        JSONObject after = rowJson.getJSONObject("after");
                        after.put("__DORIS_DELETE_SIGN__", 0);
                        out.collect(after.toJSONString());
                    } else if ("d".equals(op)) {
                        JSONObject before = rowJson.getJSONObject("before");
                        before.put("__DORIS_DELETE_SIGN__", 1);
                        out.collect(before.toJSONString());
                    } else {
                        LOG.info("filter other op:{}", op);
                    }
                }catch (Exception ex){
                    LOG.warn("filter other format binlog:{}",row);
                }
            }
        });
    }

    /**
     * Divide according to tablename
     * */
    private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
        return source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String row) throws Exception {
                try {
                    JSONObject rowJson = JSON.parseObject(row);
                    JSONObject source = rowJson.getJSONObject("source");
                    String tbl = source.getString("table");
                    return table.equals(tbl);
                }catch (Exception ex){
                    ex.printStackTrace();
                    return false;
                }
            }
        });
    }


    /**
     * create doris sink
     * */
    public static DorisSink<String> buildDorisSink(String table){
        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(DORIS_HOST + ":" + DORIS_PORT)
                .setTableIdentifier(TARGET_DORIS_DB + "." + table)
                .setUsername(DORIS_USER)
                .setPassword(DORIS_PWD);

        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
                .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload label prefix,
                .setStreamLoadProp(pro)
                .setDeletable(true)
                .build();

        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionOptions)
                .setSerializer(new SimpleStringSerializer()) //serialize according to string
                .setDorisOptions(dorisBuilder.build());

        return builder.build();
    }
}

7. Package the code and upload the new streampark job, publish and start it

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java Skill TreeHomepageOverview 138117 people are learning the system