Simple analysis of flink sql of flink cdc (table to table)

Table of Contents

    • 1. Parse the required dependencies
    • 2. Define Flink SQL
    • 3. Define the class
    • 4. Overall analysis

1. Resolve required dependencies

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.14.4</version>
 </dependency>

The specific version depends on your project needs

2. Define Flink SQL

CREATE TABLE `FLINK_CDC_DATA_SRC` (
  ID STRING NOT NULL,
  NAME STRING
  PRIMARY KEY (ID) NOT ENFORCED
) WITH (
  'connector' = 'oracle-cdc',
  'hostname' = '*.*.*.*', //your database address
  'port' = '1521',
  'username' = '****', //username
  'password' = '****', //password
  'database-name' = 'orcl', //instance name
  'schema-name' = 'FLINK_CDC_TEST', //library name
  'table-name' = 'FLINK_TEST_1'//table name
);

CREATE TABLE `FLINK_CDC_DATA_DST` (
  `id` STRING NOT NULL,
  `name` STRING
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'url' = 'jdbc:mysql://*.*.*.*:3306/flink_cdc_test?serverTimezone=Asia/Shanghai &useUnicode=true &characterEncoding=utf-8 &createDatabaseIfNotExist= true & amp;useSSL=false & amp;nullCatalogMeansCurrent=true & amp;allowPublicKeyRetrieval=true', //*.*.*.* Replace with your target library address
  'username' = '****', //username
  'password' = '******', //password
  'table-name' = 'flink_test_1'
);

insert into FLINK_CDC_DATA_DST select * from FLINK_CDC_DATA_SRC;

3. Define class

Store the parsed information

/**
** Store the basic connection information of the database
*/
@Data
public class CreateTableOptions {<!-- -->

    private String connector; //Used to store the flag of cdc connector or jdbc connection

    private String username; //database username

    private String password; //database password

    private String tableName; //table name

    private String url; //jdbc-url information
}
/***
** Used to store more information about the cdc connector
*/
@Data
public class CdcOptions extends CreateTableOptions {<!-- -->

    private String hostname; //database host address

    private String databaseName; //database instance name

    private String port; //database port

    private String schemaName; //database name

}
/**
** Used to store more information about jdbc
*/
@Data
public class JdbcOptions extends CreateTableOptions {<!-- -->

    private String connector;

    private String driver;

    private String url;

}
/**
** Parsed cdc table information
*/
@Data
public class CdcTable {<!-- -->

    /**
     * CDC table name
     */
    private String tableName;

    /**
     * Data source type
     */
    private String dataBaseType;

    /**
     * query field name
     */
    private List<CdcColumnInfo> cdcColumnInfos;

    /**
     * Primary key field
     */
    private String priKey;

    /**
     * Attributes
     */
    private CreateTableOptions createTableOptions;

    @NoArgsConstructor
    @Data
    public static class CdcColumnInfo {<!-- -->

        private String columnName;

        private String columnType;
    }
}
/**
** The basic analysis content information of the three parts of flink sql
*/
@Data
public class CdcSqlOptions {<!-- -->

    /**
     * CDC mapping entity, source table mapping
     */
    private CdcTable sourceTable;

    /**
     * JDBC mapping entity, target table mapping
     */
    private CdcTable targetTable;

    /**
     * Insert statement mapping
     */
    private InsertSqlOptions insertSqlOptions;
}

4. Overall Analysis

/**
** Body resolution method
*/
public CdcSqlOptions sqlMappingToOptions(String cdcSql) {<!-- -->
        if (StringUtils. isBlank(cdcSql)) {<!-- -->
            return null;
        }
        CdcSqlOptions cdcSqlOptions = new CdcSqlOptions();
        //Create parser
        SqlParser sqlParser = SqlParser.create(cdcSql, SqlParser.config()
                .withParserFactory(FlinkSqlParserImpl.FACTORY)
                .withQuoting(Quoting.BACK_TICK)
                .withQuotedCasing(Casing.UNCHANGED)
                .withConformance(FlinkSqlConformance.DEFAULT)
        );
        try {<!-- -->
        //sql is separated by semicolons, so you can get multiple sqlNodes
            List<SqlNode> sqlNodeList = sqlParser. parseStmtList(). getList();
            if (!CollectionUtils.isEmpty(sqlNodeList)) {<!-- -->
                for (SqlNode sqlNode : sqlNodeList) {<!-- -->
                //sql type
                    SqlKind kind = sqlNode. getKind();
                    //if create table
                    if (Objects.equals(kind, SqlKind.CREATE_TABLE)) {<!-- -->
                        boolean isSource = true;
                        SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;
                        //Get the sql and write all the parameters in with
                        Map<String, String> optionMap = StreamEx.of(sqlCreateTable.getPropertyList().getList())
                                .map(SqlTableOption. class::cast)
                                .toMap(SqlTableOption::getKeyString, SqlTableOption::getValueString, (n, o) -> n);
                        //connector is used to determine whether it is a cdc connector or a jdbc connector
                        String connector = optionMap. get("connector");
                        CdcTable cdcTable = new CdcTable();
                        //Set the cdc table name
                     cdcTable.setTableName(sqlCreateTable.getTableName().getSimple());
                        if (Objects.equals(connector, "jdbc")) {<!-- -->
                           //If it is a jdbc connector
                            isSource = false;
                            //Set jdbc parameters
                            cdcTable.setCreateTableOptions(getJdbcOoptions(optionMap));
                        } else {<!-- -->
                           //Set source database parameters
                            cdcTable.setCreateTableOptions(getSourceOptions(optionMap));
                        }
                        //Set the database type, such as mysql,oracle
                        fillDataBaseType(cdcTable, optionMap);
                        //Set field information
                        fillCdcColumnInfoList(cdcTable, sqlCreateTable);
                        //Set the primary key
                        fillCdcPriKey(cdcTable, sqlCreateTable);
                        if (isSource) {<!-- -->
                        // This table is the source table
                            cdcSqlOptions.setSourceTable(cdcTable);
                        } else {<!-- -->
                           //This table is the target table
                            cdcSqlOptions.setTargetTable(cdcTable);
                        }
                    } else if (Objects. equals(kind, SqlKind. INSERT)) {<!-- -->
                    //If it is inserted sql, analyze the insertion parameters
                        InsertSqlOptions insertSqlOptions = getInserSqlOptions(sqlNode);
                        cdcSqlOptions.setInsertSqlOptions(insertSqlOptions);
                    }
                }
            }
        } catch (Exception e) {<!-- -->
            e.printStackTrace();
        }
        return cdcSqlOptions;
    }
/**
     * Populate CDC table parameter entity JDBC type, target table
     *
     * @param optionMap parameter MAP
     */
    private JdbcOptions getJdbcOoptions(Map<String, String> optionMap) {<!-- -->
        JdbcOptions jdbcOptions = new JdbcOptions();
        jdbcOptions.setConnector(MapUtil.getStr(optionMap, "connector"));
        jdbcOptions.setDriver(MapUtil.getStr(optionMap, "driver"));
        jdbcOptions.setUrl(MapUtil.getStr(optionMap, "url"));
        jdbcOptions.setUsername(MapUtil.getStr(optionMap, "username"));
        jdbcOptions.setPassword(MapUtil.getStr(optionMap, "password"));
        jdbcOptions.setTableName(MapUtil.getStr(optionMap, "table-name"));
        return jdbcOptions;
    }
/**
     * Populate CDC table parameter entity, source table
     *
     * @param optionMap parameter MAP
     * @return CDC table parameter entity
     */
    private CdcOptions getSourceOptions(Map<String, String> optionMap) {<!-- -->
        CdcOptions cdcOptions = new CdcOptions();
        cdcOptions.setConnector(MapUtil.getStr(optionMap, "connector"));
        cdcOptions.setHostname(MapUtil.getStr(optionMap, "hostname"));
        cdcOptions.setUsername(MapUtil.getStr(optionMap, "username"));
        cdcOptions.setPassword(MapUtil.getStr(optionMap, "password"));
        cdcOptions.setDatabaseName(MapUtil.getStr(optionMap, "database-name"));
        cdcOptions.setTableName(MapUtil.getStr(optionMap, "table-name"));
        cdcOptions.setPort(MapUtil.getStr(optionMap, "port"));
        cdcOptions.setSchemaName(MapUtil.getStr(optionMap, "schema-name"));
        return cdcOptions;
    }
/**
     * Populate the dataBaseType of the CDC table entity, mainly used to connect to the database
     *
     * @param cdcTable CDC table entity
     * @param optionMap CDC configuration parameter MAP
     */
    private void fillDataBaseType(CdcTable cdcTable, Map<String, String> optionMap) {<!-- -->
        String connector = optionMap. get("connector");
        if (Objects.equals(connector, "jdbc")) {<!-- -->
            connector = optionMap.get("url");
        }
        String finalConnector = connector;
        //DataSourceTypeEnums.values() Enumeration of database types
        DataSourceTypeEnums dataSourceTypeEnums = Arrays. stream(DataSourceTypeEnums. values())
                .filter(dataSource -> finalConnector.contains(dataSource.getType()))
                .findFirst()
                //The default is mysql
                .orElse(DataSourceTypeEnums.MYSQL);
        cdcTable.setDataBaseType(dataSourceTypeEnums.getType());
    }
/**
     * Populate the field information of the CDC table entity
     *
     * @param cdcTable CDC table entity
     * @param sqlCreateTable entity parsed by flink
     */
    private void fillCdcColumnInfoList(CdcTable cdcTable, SqlCreateTable sqlCreateTable) {<!-- -->
        List<CdcTable.CdcColumnInfo> columnInfoList = Lists.newArrayList();
        SqlNodeList columnList = sqlCreateTable.getColumnList();
        if (!ObjectUtils.isEmpty(columnList)) {<!-- -->
            List<SqlNode> columnNodeList = columnList. getList();
            for (SqlNode columnNode : columnNodeList) {<!-- -->
                SqlTableColumn.SqlRegularColumn sqlRegularColumn = (SqlTableColumn.SqlRegularColumn) columnNode;
                SqlDataTypeSpec sqlRegularColumnType = sqlRegularColumn. getType();
                CdcTable.CdcColumnInfo cdcColumnInfo = new CdcTable.CdcColumnInfo();
                cdcColumnInfo.setColumnName(sqlRegularColumn.getName().getSimple());
                cdcColumnInfo.setColumnType(sqlRegularColumnType.toString());
                columnInfoList.add(cdcColumnInfo);
            }
        }
        cdcTable.setCdcColumnInfos(columnInfoList);
    }
/**
     * Fill in the primary key information of the CDC table entity
     *
     * @param cdcTable CDC table entity
     * @param sqlCreateTable entity parsed by flink
     */
    private void fillCdcPriKey(CdcTable cdcTable, SqlCreateTable sqlCreateTable) {<!-- -->
        List<SqlTableConstraint> tableConstraints = sqlCreateTable.getTableConstraints();
        if (!CollectionUtils.isEmpty(tableConstraints)) {<!-- -->
            for (SqlTableConstraint sqlTableConstraint : tableConstraints) {<!-- -->
                if (sqlTableConstraint.isPrimaryKey()) {<!-- -->
                    cdcTable.setPriKey(StringUtils.join(sqlTableConstraint.getColumnNames()));
                }
            }
        }
    }
/**
     * Parse the insert statement in CDC SQL
     *
     * @param sqlNode SQL node
     * @return INSERT parameter entity
     */
    private InsertSqlOptions getInserSqlOptions(SqlNode sqlNode) {<!-- -->
        InsertSqlOptions insertSqlOptions = new InsertSqlOptions();
        // target table field
        List<String> targetColumnNameList = Lists. newArrayList();
       //source table fields
        List<String> sourceColumnNameList = Lists. newArrayList();
        SqlInsert sqlInsert = (SqlInsert) sqlNode;
        String targetTableName = sqlInsert. getTargetTable(). toString();
        SqlNodeList targetColumnList = sqlInsert.getTargetColumnList();
        if (!ObjectUtils.isEmpty(targetColumnList)) {<!-- -->
            List<SqlNode> targetColumnNodeList = targetColumnList.getList();
            for (SqlNode targetColumnNode : targetColumnNodeList) {<!-- -->
                SqlIdentifier sqlIdentifier = (SqlIdentifier) targetColumnNode;
                targetColumnNameList.add(sqlIdentifier.getSimple());
            }
        }
        SqlNode source = sqlInsert. getSource();
        //Equivalent to the select part of insert into...select...
        if (source.getKind().equals(SqlKind.SELECT)) {<!-- -->
            SqlSelect sqlSelect = (SqlSelect) source;
            SqlIdentifier fromIdentifier = (SqlIdentifier) sqlSelect.getFrom();
            insertSqlOptions.setSourceTable(fromIdentifier.getSimple());
            SqlNodeList selectList = sqlSelect. getSelectList();
            if (!ObjectUtils.isEmpty(selectList)) {<!-- -->
                List<SqlNode> selectNodeList = selectList.getList();
                for (SqlNode selectNode : selectNodeList) {<!-- -->
                    SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode;
                    sourceColumnNameList.add(sqlIdentifier.toString());
                }
            }
            insertSqlOptions.setSelectSql(sqlSelect.toString());
            insertSqlOptions.setSelectWhere(ObjectUtils.isEmpty(sqlSelect.getWhere()) ? "" : sqlSelect.getWhere().toString());
        }
        insertSqlOptions.setTargetTable(targetTableName);
        insertSqlOptions.setTargetColumnList(targetColumnNameList);
        insertSqlOptions.setSourceColumnList(sourceColumnNameList);
        return insertSqlOptions;
    }

complete.