Table of Contents
-
- 1. Parse the required dependencies
- 2. Define Flink SQL
- 3. Define the class
- 4. Overall analysis
1. Resolve required dependencies
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.14.4</version> </dependency>
The specific version depends on your project needs
2. Define Flink SQL
CREATE TABLE `FLINK_CDC_DATA_SRC` ( ID STRING NOT NULL, NAME STRING PRIMARY KEY (ID) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '*.*.*.*', //your database address 'port' = '1521', 'username' = '****', //username 'password' = '****', //password 'database-name' = 'orcl', //instance name 'schema-name' = 'FLINK_CDC_TEST', //library name 'table-name' = 'FLINK_TEST_1'//table name ); CREATE TABLE `FLINK_CDC_DATA_DST` ( `id` STRING NOT NULL, `name` STRING PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://*.*.*.*:3306/flink_cdc_test?serverTimezone=Asia/Shanghai &useUnicode=true &characterEncoding=utf-8 &createDatabaseIfNotExist= true & amp;useSSL=false & amp;nullCatalogMeansCurrent=true & amp;allowPublicKeyRetrieval=true', //*.*.*.* Replace with your target library address 'username' = '****', //username 'password' = '******', //password 'table-name' = 'flink_test_1' ); insert into FLINK_CDC_DATA_DST select * from FLINK_CDC_DATA_SRC;
3. Define class
Store the parsed information
/** ** Store the basic connection information of the database */ @Data public class CreateTableOptions {<!-- --> private String connector; //Used to store the flag of cdc connector or jdbc connection private String username; //database username private String password; //database password private String tableName; //table name private String url; //jdbc-url information }
/*** ** Used to store more information about the cdc connector */ @Data public class CdcOptions extends CreateTableOptions {<!-- --> private String hostname; //database host address private String databaseName; //database instance name private String port; //database port private String schemaName; //database name }
/** ** Used to store more information about jdbc */ @Data public class JdbcOptions extends CreateTableOptions {<!-- --> private String connector; private String driver; private String url; }
/** ** Parsed cdc table information */ @Data public class CdcTable {<!-- --> /** * CDC table name */ private String tableName; /** * Data source type */ private String dataBaseType; /** * query field name */ private List<CdcColumnInfo> cdcColumnInfos; /** * Primary key field */ private String priKey; /** * Attributes */ private CreateTableOptions createTableOptions; @NoArgsConstructor @Data public static class CdcColumnInfo {<!-- --> private String columnName; private String columnType; } }
/** ** The basic analysis content information of the three parts of flink sql */ @Data public class CdcSqlOptions {<!-- --> /** * CDC mapping entity, source table mapping */ private CdcTable sourceTable; /** * JDBC mapping entity, target table mapping */ private CdcTable targetTable; /** * Insert statement mapping */ private InsertSqlOptions insertSqlOptions; }
4. Overall Analysis
/** ** Body resolution method */ public CdcSqlOptions sqlMappingToOptions(String cdcSql) {<!-- --> if (StringUtils. isBlank(cdcSql)) {<!-- --> return null; } CdcSqlOptions cdcSqlOptions = new CdcSqlOptions(); //Create parser SqlParser sqlParser = SqlParser.create(cdcSql, SqlParser.config() .withParserFactory(FlinkSqlParserImpl.FACTORY) .withQuoting(Quoting.BACK_TICK) .withQuotedCasing(Casing.UNCHANGED) .withConformance(FlinkSqlConformance.DEFAULT) ); try {<!-- --> //sql is separated by semicolons, so you can get multiple sqlNodes List<SqlNode> sqlNodeList = sqlParser. parseStmtList(). getList(); if (!CollectionUtils.isEmpty(sqlNodeList)) {<!-- --> for (SqlNode sqlNode : sqlNodeList) {<!-- --> //sql type SqlKind kind = sqlNode. getKind(); //if create table if (Objects.equals(kind, SqlKind.CREATE_TABLE)) {<!-- --> boolean isSource = true; SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode; //Get the sql and write all the parameters in with Map<String, String> optionMap = StreamEx.of(sqlCreateTable.getPropertyList().getList()) .map(SqlTableOption. class::cast) .toMap(SqlTableOption::getKeyString, SqlTableOption::getValueString, (n, o) -> n); //connector is used to determine whether it is a cdc connector or a jdbc connector String connector = optionMap. get("connector"); CdcTable cdcTable = new CdcTable(); //Set the cdc table name cdcTable.setTableName(sqlCreateTable.getTableName().getSimple()); if (Objects.equals(connector, "jdbc")) {<!-- --> //If it is a jdbc connector isSource = false; //Set jdbc parameters cdcTable.setCreateTableOptions(getJdbcOoptions(optionMap)); } else {<!-- --> //Set source database parameters cdcTable.setCreateTableOptions(getSourceOptions(optionMap)); } //Set the database type, such as mysql,oracle fillDataBaseType(cdcTable, optionMap); //Set field information fillCdcColumnInfoList(cdcTable, sqlCreateTable); //Set the primary key fillCdcPriKey(cdcTable, sqlCreateTable); if (isSource) {<!-- --> // This table is the source table cdcSqlOptions.setSourceTable(cdcTable); } else {<!-- --> //This table is the target table cdcSqlOptions.setTargetTable(cdcTable); } } else if (Objects. equals(kind, SqlKind. INSERT)) {<!-- --> //If it is inserted sql, analyze the insertion parameters InsertSqlOptions insertSqlOptions = getInserSqlOptions(sqlNode); cdcSqlOptions.setInsertSqlOptions(insertSqlOptions); } } } } catch (Exception e) {<!-- --> e.printStackTrace(); } return cdcSqlOptions; }
/** * Populate CDC table parameter entity JDBC type, target table * * @param optionMap parameter MAP */ private JdbcOptions getJdbcOoptions(Map<String, String> optionMap) {<!-- --> JdbcOptions jdbcOptions = new JdbcOptions(); jdbcOptions.setConnector(MapUtil.getStr(optionMap, "connector")); jdbcOptions.setDriver(MapUtil.getStr(optionMap, "driver")); jdbcOptions.setUrl(MapUtil.getStr(optionMap, "url")); jdbcOptions.setUsername(MapUtil.getStr(optionMap, "username")); jdbcOptions.setPassword(MapUtil.getStr(optionMap, "password")); jdbcOptions.setTableName(MapUtil.getStr(optionMap, "table-name")); return jdbcOptions; }
/** * Populate CDC table parameter entity, source table * * @param optionMap parameter MAP * @return CDC table parameter entity */ private CdcOptions getSourceOptions(Map<String, String> optionMap) {<!-- --> CdcOptions cdcOptions = new CdcOptions(); cdcOptions.setConnector(MapUtil.getStr(optionMap, "connector")); cdcOptions.setHostname(MapUtil.getStr(optionMap, "hostname")); cdcOptions.setUsername(MapUtil.getStr(optionMap, "username")); cdcOptions.setPassword(MapUtil.getStr(optionMap, "password")); cdcOptions.setDatabaseName(MapUtil.getStr(optionMap, "database-name")); cdcOptions.setTableName(MapUtil.getStr(optionMap, "table-name")); cdcOptions.setPort(MapUtil.getStr(optionMap, "port")); cdcOptions.setSchemaName(MapUtil.getStr(optionMap, "schema-name")); return cdcOptions; }
/** * Populate the dataBaseType of the CDC table entity, mainly used to connect to the database * * @param cdcTable CDC table entity * @param optionMap CDC configuration parameter MAP */ private void fillDataBaseType(CdcTable cdcTable, Map<String, String> optionMap) {<!-- --> String connector = optionMap. get("connector"); if (Objects.equals(connector, "jdbc")) {<!-- --> connector = optionMap.get("url"); } String finalConnector = connector; //DataSourceTypeEnums.values() Enumeration of database types DataSourceTypeEnums dataSourceTypeEnums = Arrays. stream(DataSourceTypeEnums. values()) .filter(dataSource -> finalConnector.contains(dataSource.getType())) .findFirst() //The default is mysql .orElse(DataSourceTypeEnums.MYSQL); cdcTable.setDataBaseType(dataSourceTypeEnums.getType()); }
/** * Populate the field information of the CDC table entity * * @param cdcTable CDC table entity * @param sqlCreateTable entity parsed by flink */ private void fillCdcColumnInfoList(CdcTable cdcTable, SqlCreateTable sqlCreateTable) {<!-- --> List<CdcTable.CdcColumnInfo> columnInfoList = Lists.newArrayList(); SqlNodeList columnList = sqlCreateTable.getColumnList(); if (!ObjectUtils.isEmpty(columnList)) {<!-- --> List<SqlNode> columnNodeList = columnList. getList(); for (SqlNode columnNode : columnNodeList) {<!-- --> SqlTableColumn.SqlRegularColumn sqlRegularColumn = (SqlTableColumn.SqlRegularColumn) columnNode; SqlDataTypeSpec sqlRegularColumnType = sqlRegularColumn. getType(); CdcTable.CdcColumnInfo cdcColumnInfo = new CdcTable.CdcColumnInfo(); cdcColumnInfo.setColumnName(sqlRegularColumn.getName().getSimple()); cdcColumnInfo.setColumnType(sqlRegularColumnType.toString()); columnInfoList.add(cdcColumnInfo); } } cdcTable.setCdcColumnInfos(columnInfoList); }
/** * Fill in the primary key information of the CDC table entity * * @param cdcTable CDC table entity * @param sqlCreateTable entity parsed by flink */ private void fillCdcPriKey(CdcTable cdcTable, SqlCreateTable sqlCreateTable) {<!-- --> List<SqlTableConstraint> tableConstraints = sqlCreateTable.getTableConstraints(); if (!CollectionUtils.isEmpty(tableConstraints)) {<!-- --> for (SqlTableConstraint sqlTableConstraint : tableConstraints) {<!-- --> if (sqlTableConstraint.isPrimaryKey()) {<!-- --> cdcTable.setPriKey(StringUtils.join(sqlTableConstraint.getColumnNames())); } } } }
/** * Parse the insert statement in CDC SQL * * @param sqlNode SQL node * @return INSERT parameter entity */ private InsertSqlOptions getInserSqlOptions(SqlNode sqlNode) {<!-- --> InsertSqlOptions insertSqlOptions = new InsertSqlOptions(); // target table field List<String> targetColumnNameList = Lists. newArrayList(); //source table fields List<String> sourceColumnNameList = Lists. newArrayList(); SqlInsert sqlInsert = (SqlInsert) sqlNode; String targetTableName = sqlInsert. getTargetTable(). toString(); SqlNodeList targetColumnList = sqlInsert.getTargetColumnList(); if (!ObjectUtils.isEmpty(targetColumnList)) {<!-- --> List<SqlNode> targetColumnNodeList = targetColumnList.getList(); for (SqlNode targetColumnNode : targetColumnNodeList) {<!-- --> SqlIdentifier sqlIdentifier = (SqlIdentifier) targetColumnNode; targetColumnNameList.add(sqlIdentifier.getSimple()); } } SqlNode source = sqlInsert. getSource(); //Equivalent to the select part of insert into...select... if (source.getKind().equals(SqlKind.SELECT)) {<!-- --> SqlSelect sqlSelect = (SqlSelect) source; SqlIdentifier fromIdentifier = (SqlIdentifier) sqlSelect.getFrom(); insertSqlOptions.setSourceTable(fromIdentifier.getSimple()); SqlNodeList selectList = sqlSelect. getSelectList(); if (!ObjectUtils.isEmpty(selectList)) {<!-- --> List<SqlNode> selectNodeList = selectList.getList(); for (SqlNode selectNode : selectNodeList) {<!-- --> SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode; sourceColumnNameList.add(sqlIdentifier.toString()); } } insertSqlOptions.setSelectSql(sqlSelect.toString()); insertSqlOptions.setSelectWhere(ObjectUtils.isEmpty(sqlSelect.getWhere()) ? "" : sqlSelect.getWhere().toString()); } insertSqlOptions.setTargetTable(targetTableName); insertSqlOptions.setTargetColumnList(targetColumnNameList); insertSqlOptions.setSourceColumnList(sourceColumnNameList); return insertSqlOptions; }
complete.