FlinkCDC for mysql to Clickhouse

Complete dependencies

<dependencies>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-core</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-streaming-java_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
 
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-jdbc_2.12</artifactId>-->
<!-- <version>1.10.3</version>-->
<!-- </dependency>-->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-jdbc_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
 
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-java</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-clients_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-api-java-bridge_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-common</artifactId>
           <version>1.13.0</version>
       </dependency>
 
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-planner_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
 
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-planner-blink_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-planner-blink_2.12</artifactId>
           <version>1.13.0</version>
           <type>test-jar</type>
       </dependency>
 
       <dependency>
           <groupId>com.alibaba.ververica</groupId>
           <artifactId>flink-connector-mysql-cdc</artifactId>
           <version>1.4.0</version>
       </dependency>
 
 
       <dependency>
           <groupId>com.aliyun</groupId>
           <artifactId>flink-connector-clickhouse</artifactId>
           <version>1.12.0</version>
       </dependency>
       <dependency>
           <groupId>ru.yandex.clickhouse</groupId>
           <artifactId>clickhouse-jdbc</artifactId>
           <version>0.2.6</version>
       </dependency>
       <dependency>
           <groupId>com.google.code.gson</groupId>
           <artifactId>gson</artifactId>
           <version>2.8.6</version>
       </dependency>
   </dependencies>

Flink CDC

package name.lijiaqi.cdc;
 
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
 
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap;
 
public class MySqlBinlogSourceExample {<!-- -->
   public static void main(String[] args) throws Exception {<!-- -->
       SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
              .hostname("localhost")
              .port(3306)
              .databaseList("test")
              .username("flinkcdc")
              .password("dafei1288")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .build();
 
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
       // add source
       env.addSource(sourceFunction)
       // add sink
      .addSink(new ClickhouseSink());
 
       env.execute("mysql2clickhouse");
  }
 
   //Deserialize cdc data
   public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {<!-- -->
       @Override
       public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {<!-- -->
 
           Gson jsstr = new Gson();
           HashMap<String, Object> hs = new HashMap<>();
 
           String topic = sourceRecord.topic();
           String[] split = topic.split("[.]");
           String database = split[1];
           String table = split[2];
           hs.put("database",database);
           hs.put("table",table);
           //Get the operation type
           Envelope.Operation operation = Envelope.operationFor(sourceRecord);
           //Get the data itself
           Struct struct = (Struct)sourceRecord.value();
           Struct after = struct.getStruct("after");
 
           if (after != null) {<!-- -->
               Schema schema = after.schema();
               HashMap<String, Object> afhs = new HashMap<>();
               for (Field field : schema.fields()) {<!-- -->
                   afhs.put(field.name(), after.get(field.name()));
              }
               hs.put("data",afhs);
          }
 
           String type = operation.toString().toLowerCase();
           if ("create".equals(type)) {<!-- -->
               type = "insert";
          }
           hs.put("type",type);
 
           collector.collect(jsstr.toJson(hs));
      }
 
       @Override
       public TypeInformation<String> getProducedType() {<!-- -->
           return BasicTypeInfo.STRING_TYPE_INFO;
      }
  }
 
 
   public static class ClickhouseSink extends RichSinkFunction<String>{<!-- -->
       Connection connection;
       PreparedStatement pstmt;
       private Connection getConnection() {<!-- -->
           Connection conn = null;
           try {<!-- -->
               Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
               String url = "jdbc:clickhouse://localhost:8123/default";
               conn = DriverManager.getConnection(url,"default","dafei1288");
 
          } catch (Exception e) {<!-- -->
               e.printStackTrace();
          }
           return conn;
      }
 
       @Override
       public void open(Configuration parameters) throws Exception {<!-- -->
           super.open(parameters);
           connection = getConnection();
           String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";
           pstmt = connection.prepareStatement(sql);
      }
 
       // Called once when each record is inserted
       public void invoke(String value, Context context) throws Exception {<!-- -->
           //{<!-- -->"database":"test","data":{<!-- -->"name":"jacky","description":"fffff","id": 8},"type":"insert","table":"test_cdc"}
           Gson t = new Gson();
           HashMap<String,Object> hs = t.fromJson(value,HashMap.class);
           String database = (String)hs.get("database");
           String table = (String)hs.get("table");
           String type = (String)hs.get("type");
 
           if("test".equals(database) & amp; & amp; "test_cdc".equals(table)){<!-- -->
               if("insert".equals(type)){<!-- -->
                   System.out.println("insert => " + value);
                   LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");
                   String name = (String)data.get("name");
                   String description = (String)data.get("description");
                   Double id = (Double)data.get("id");
                   //Assign value to previous placeholder
                   pstmt.setInt(1, id.intValue());
                   pstmt.setString(2, name);
                   pstmt.setString(3, description);
 
                   pstmt.executeUpdate();
              }
          }
      }
 
       @Override
       public void close() throws Exception {<!-- -->
           super.close();
 
           if(pstmt != null) {<!-- -->
               pstmt.close();
          }
 
           if(connection != null) {<!-- -->
               connection.close();
          }
      }
  }
}

Flink SQL CDC

package name.lijiaqi.cdc;
 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
public class MysqlToMysqlMain {<!-- -->
   public static void main(String[] args) throws Exception {<!-- -->
       EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inStreamingMode()
              .build();
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
 
 
 
       tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
 
 
       //data source table
       String sourceDDL =
               "CREATE TABLE mysql_binlog (\
" +
                       "id INT NOT NULL,\
" +
                       " name STRING,\
" +
                       " description STRING\
" +
                       ") WITH (\
" +
                       " 'connector' = 'mysql-cdc',\
" +
                       " 'hostname' = 'localhost',\
" +
                       " 'port' = '3306',\
" +
                       " 'username' = 'flinkcdc',\
" +
                       " 'password' = 'dafei1288',\
" +
                       " 'database-name' = 'test',\
" +
                       " 'table-name' = 'test_cdc'\
" +
                       ")";
 
 
       String url = "jdbc:mysql://127.0.0.1:3306/test";
       String userName = "root";
       String password = "dafei1288";
       String mysqlSinkTable = "test_cdc_sink";
       // Output target table
       String sinkDDL =
               "CREATE TABLE test_cdc_sink (\
" +
                       "id INT NOT NULL,\
" +
                       " name STRING,\
" +
                       " description STRING,\
" +
                       " PRIMARY KEY (id) NOT ENFORCED \
 " +
                       ") WITH (\
" +
                       " 'connector' = 'jdbc',\
" +
                       " 'driver' = 'com.mysql.jdbc.Driver',\
" +
                       " 'url' = '" + url + "',\
" +
                       " 'username' = '" + userName + "',\
" +
                       " 'password' = '" + password + "',\
" +
                       " 'table-name' = '" + mysqlSinkTable + "'\
" +
                       ")";
       // Simple aggregation processing
       String transformSQL =
               "insert into test_cdc_sink select * from mysql_binlog";
 
       tableEnv.executeSql(sourceDDL);
       tableEnv.executeSql(sinkDDL);
       TableResult result = tableEnv.executeSql(transformSQL);
 
       // Wait for flink-cdc to complete the snapshot
       result.print();
       env.execute("sync-flink-cdc");
  }
 
}