springboot integrates Flink-CDC

Article directory

  • 1. Flink & Flink CDC official website
  • 2. Introduction to CDC & Flink CDC
    • 1. What is CDC
    • 2. What is Flink CDC?
    • 3. Supported connectors
  • 3. Springboot integrates Filnk CDC
    • 1. Official website example
    • 2. Maven dependencies
      • 1) Flink and Flink CDC version mapping
      • 2) Specific maven dependencies
      • 3) Project pitfalls
    • 3. springboot code example
      • 1) Create a change listener
      • 2) Custom data parser
      • 3) Create a change object
      • 4) Create business processing class
      • 5) Run code to listen for mysql CDC events

1. Flink & Flink CDC official website

Flink CDC address
Flink official website address

2. Introduction to CDC & amp;Flink CDC

1. What is CDC

CDC: The full name is Change Data Capture, which is data change capture technology. The specific meaning is to identify and capture changes made to the data in the database (including insertion, update, and deletion of data or data tables; changes and adjustments to the database structure, etc. ), and then completely record these changes in the order in which they occur, and transmit the change sequence messages to downstream processes or systems in real time through intermediate technology bridges (message middleware, TCP, etc.).

2. What is Flink CDC

The CDC connectors for Apache Flink are a set of source connectors for Apache Flink that use change data capture (CDC) to introduce changes from different databases. Apache Flink’s CDC connector integrates Debezium as an engine for capturing data changes. Therefore, it can take full advantage of Debezium’s capabilities. Find out more about what Debezium is. ?

3. Supported connectors

3. springboot integrates Filnk CDC

1. Official website example

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {<!-- -->
  public static void main(String[] args) throws Exception {<!-- -->
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // enable checkpoint
    env.enableCheckpointing(3000);
    
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
    env.execute("Print MySQL Snapshot + Binlog");
  }
}

2. Maven dependencies

1) Flink and Flink CDC version mapping

2) Specific maven dependencies

<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.12.RELEASE</spring-boot.version>
        <log4j2.version>2.17.0</log4j2.version>
        <mybatis-plus.version>3.4.1</mybatis-plus.version>
        <sharding.jdbc.version>4.1.0</sharding.jdbc.version>
        <flink.version>1.16.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--log4j vulnerability fix-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j2.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j2.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j2.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-1.2-api</artifactId>
            <version>${log4j2.version}</version>
        </dependency>

        <!--Introducing log4j2 dependency-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>

        <!--Paging plug-in-->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.4.1</version>
        </dependency>

        <!--Database driver-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--There is no com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset method in 8.0.25-->
            <version>8.0.29</version>
        </dependency>

        <!--Database connection pool-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.4</version>
        </dependency>

        <!--Introducing easyexcel dependency-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>3.1.1</version>
        </dependency>

        <!--Sub-database and sub-table-->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
            <!--<version>4.0.0-RC1</version>-->
            <version>4.1.1</version>
        </dependency>

        <!--mybatsiplus and automatic code generation tools-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-engine-core</artifactId>
            <version>2.3</version>
        </dependency>

        <!--Paging plug-in-->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.4.1</version>
        </dependency>

        <!-- Multiple data sources -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>3.6.1</version>
        </dependency>

        <!--Enable elegant annotation verification-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>

        <!--apache-http request-->
        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.14</version>
        </dependency>

        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

        <!--aliyun oss-->
        <dependency>
            <groupId>com.aliyun.oss</groupId>
            <artifactId>aliyun-sdk-oss</artifactId>
            <version>3.15.0</version>
        </dependency>

        <!--xxl-job-->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.4.0</version>
        </dependency>


        <!-- ShardingSphere read-write separation/sub-database and table -->
       <!-- <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.1.2</version>
        </dependency>-->

        <!-- Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <!--Do not use the following when running, it will report an error that the class cannot be found -->
            <!--<scope>provided</scope>-->
        </dependency>

        <!--Starting from version flink1.11.0, you need to introduce an additional flink-client package. Without this dependency, an error will be reported No ExecutorFactory found to execute the application-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink CDC -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
             <!-- The 8.0.25 introduced by the mysql jdbc driver needs to be removed. Version 8.0.28 and above must be introduced.
             Otherwise, an error will be reported when starting the project: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;-->
            <exclusions>
                <exclusion>
                    <groupId>mysql</groupId>
                    <artifactId>mysql-connector-java</artifactId>
                </exclusion>
            </exclusions>
           <!-- <scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
        <!--<dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>-->

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.12.1</version>
        </dependency>

3) Project pitfalls

  • The mysql jdbc driver package version is too low, causing java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String; when the project is started. The method cannot be found and must be introduced. Version 8.0.28 and above
  • The 2.3.0 version dependency of flink-connector-mysql-cdc introduces 8.0.25, which needs to be removed.
<!--Database driver-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--There is no com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset method in 8.0.25-->
            <version>8.0.29</version>
        </dependency>

<dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
             <!-- The 8.0.25 introduced by the mysql jdbc driver needs to be removed. Version 8.0.28 and above must be introduced.
             Otherwise, an error will be reported when starting the project: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;-->
            <exclusions>
                <exclusion>
                    <groupId>mysql</groupId>
                    <artifactId>mysql-connector-java</artifactId>
                </exclusion>
            </exclusions>
  • Error when starting the project: java.lang.NoClassDefFoundError: org/apache/flink/table/api/ValidationException. This class cannot be found because of the lack of dependency packages. Introduce relevant dependency packages.
 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.12.1</version>
        </dependency>
  • The version of Flink must be compatible with the version of Flink CDC, and should be introduced according to the official version.


If the version is too low, it will cause: Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation,
To resolve this issue, you can follow these steps:
Make sure the database user you use has RELOAD permissions. Please log in to the MySQL database and grant RELOAD permission to the user. For example, use the following command to grant RELOAD permissions to user your_user:

GRANT RELOAD ON *.* TO 'your_user'@'localhost';

Flink officially solves the above problems
MySQL CDC source uses an incremental snapshot algorithm to avoid the use of database locks, so it does not require “RELOAD” permission.
Starting from Flink version 1.12, Flink introduces integration and support for MySQL CDC. In this version, Flink provides the flink-connector-mysql-cdc module to implement the MySQL-based Change Data Capture function.
In Flink version 1.12, the MySQL CDC source uses an incremental snapshot algorithm to capture data changes and does not require RELOAD permission. This implementation avoids the use of database locks and provides low-latency data change capture capabilities.

Set can.incremental.snapshot.enabled in the code to turn it on. For detailed code, see the code example.

Configuration config = new Configuration();
//Set incremental snapshot enable to true
config.setBoolean("scan.incremental.snapshot.enabled", true);
env.configure(config);

3. springboot code example

1) Create a change listener

Create the MysqlEventListener class to implement ApplicationRunner, and you can start mysql listening when the project starts.

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;

/**
 * @Description: mysql change listener
 * @Date: 2023/10/11
 **/
public class MysqlEventListener implements ApplicationRunner {<!-- -->

    @Override
    public void run(ApplicationArguments args) throws Exception {<!-- -->
        MySqlSource<DataChangeInfo> mySqlSource = MySqlSource.<DataChangeInfo>builder()
                .hostname("yourHostname")
                .port(3306)
                .databaseList("yourDatabaseName") //Set the captured database. If you need to synchronize the entire database, please set tableList to ".*".
                .tableList("yourDatabaseName.yourTableName") //Set the captured table, database.table name
                .username("yourUsername")
                .password("yourPassword")
                .deserializer(new MysqlDeserialization()) // Convert SourceRecord to custom object
                /**Initial initialization snapshot, that is, incremental import after full import (detection of update data writing)
                 * latest: Only incremental import (does not read historical changes)
                 * timestamp: Specify the timestamp for data import (error reading data greater than or equal to the specified time)
                 */
                .startupOptions(StartupOptions.latest())
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        //Set incremental snapshot enable to true
        config.setBoolean("scan.incremental.snapshot.enabled", true);
        env.configure(config);
        env.setParallelism(1);

        // DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStreamSource<DataChangeInfo> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1);
        streamSource.addSink(new DataChangeSink());
        env.execute("mysql-stream-cdc");
    };

}

2) Custom data parser

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Optional;

/**
 * @Description: mysql custom serialization
 * @Date: 2023/10/11
 **/
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {<!-- -->

    public static final String TS_MS = "ts_ms";
    public static final String BIN_FILE = "file";
    public static final String POS = "pos";
    public static final String CREATE = "CREATE";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String UPDATE = "UPDATE";

    /**
     * Deserialize data and convert it into a custom object DataChangeInfo
     * @param sourceRecord
     * @param collector
     * @throwsException
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {<!-- -->
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];
        Struct struct = (Struct) sourceRecord.value();
        final Struct source = struct.getStruct(SOURCE);
        DataChangeInfo dataChangeInfo = new DataChangeInfo();
        dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());
        dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
        //5. Get the operation type CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toUpperCase();
        int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
        dataChangeInfo.setEventType(eventType);
        dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
        dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));
        dataChangeInfo.setDatabase(database);
        dataChangeInfo.setTableName(tableName);
        dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
        //7.Output data
        collector.collect(dataChangeInfo);

    }


    /**
     * Get the data before or after the change from metadata
     * @param value
     * @param fieldElement
     * @return
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {<!-- -->
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {<!-- -->
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {<!-- -->
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }


    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {<!-- -->
        return TypeInformation.of(DataChangeInfo.class);
    }
}

3) Create a change object

import lombok.Data;

/**
 * @Description: Data change object
 * @Date: 2023/10/11
 **/
@Data
public class DataChangeInfo {<!-- -->
    /**
     *Data before change
     */
    private String beforeData;
    /**
     *Data after change
     */
    private String afterData;
    /**
     * Change type 1 Add 2 Modify 3 Delete
     */
    private Integer eventType;
    /**
     * binlog file name
     */
    private String fileName;
    /**
     * binlog current reading point
     */
    private Integer filePos;
    /**
     * Database name
     */
    private String database;
    /**
     * Table Name
     */
    private String tableName;
    /**
     *Change time
     */
    private Long changeTime;

}

4) Create business processing class

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**
 * @Description: Data processing
 * @Date: 2023/10/11
 **/
@Slf4j
public class DataChangeSink implements SinkFunction<String> {<!-- -->

    @Override
    public void invoke(String value, Context context) throws Exception {<!-- -->
        log.info("Received change original data:{}", value);
        //Business code
    }
}

5) Run code to monitor mysql CDC events

Project started successfully

  • Modify mysql database data
    before change

    After changes: Click Save

    Service monitoring results