Article directory
- 1. Flink & Flink CDC official website
- 2. Introduction to CDC & Flink CDC
-
- 1. What is CDC
- 2. What is Flink CDC?
- 3. Supported connectors
- 3. Springboot integrates Filnk CDC
-
- 1. Official website example
- 2. Maven dependencies
-
- 1) Flink and Flink CDC version mapping
- 2) Specific maven dependencies
- 3) Project pitfalls
- 3. springboot code example
-
- 1) Create a change listener
- 2) Custom data parser
- 3) Create a change object
- 4) Create business processing class
- 5) Run code to listen for mysql CDC events
1. Flink & Flink CDC official website
Flink CDC address
Flink official website address
2. Introduction to CDC & amp;Flink CDC
1. What is CDC
CDC: The full name is Change Data Capture, which is data change capture technology. The specific meaning is to identify and capture changes made to the data in the database (including insertion, update, and deletion of data or data tables; changes and adjustments to the database structure, etc. ), and then completely record these changes in the order in which they occur, and transmit the change sequence messages to downstream processes or systems in real time through intermediate technology bridges (message middleware, TCP, etc.).
2. What is Flink CDC
The CDC connectors for Apache Flink are a set of source connectors for Apache Flink that use change data capture (CDC) to introduce changes from different databases. Apache Flink’s CDC connector integrates Debezium as an engine for capturing data changes. Therefore, it can take full advantage of Debezium’s capabilities. Find out more about what Debezium is. ?
3. Supported connectors
3. springboot integrates Filnk CDC
1. Official website example
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.connectors.mysql.source.MySqlSource; public class MySqlBinlogSourceExample {<!-- --> public static void main(String[] args) throws Exception {<!-- --> MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("yourHostname") .port(yourPort) .databaseList("yourDatabaseName") // set captured database .tableList("yourDatabaseName.yourTableName") // set captured table .username("yourUsername") .password("yourPassword") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); } }
2. Maven dependencies
1) Flink and Flink CDC version mapping
2) Specific maven dependencies
<properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.3.12.RELEASE</spring-boot.version> <log4j2.version>2.17.0</log4j2.version> <mybatis-plus.version>3.4.1</mybatis-plus.version> <sharding.jdbc.version>4.1.0</sharding.jdbc.version> <flink.version>1.16.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <!--log4j vulnerability fix--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-1.2-api</artifactId> <version>${log4j2.version}</version> </dependency> <!--Introducing log4j2 dependency--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!--Paging plug-in--> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> <version>1.4.1</version> </dependency> <!--Database driver--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <!--There is no com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset method in 8.0.25--> <version>8.0.29</version> </dependency> <!--Database connection pool--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.2.4</version> </dependency> <!--Introducing easyexcel dependency--> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>3.1.1</version> </dependency> <!--Sub-database and sub-table--> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <!--<version>4.0.0-RC1</version>--> <version>4.1.1</version> </dependency> <!--mybatsiplus and automatic code generation tools--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>${mybatis-plus.version}</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>${mybatis-plus.version}</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>2.3</version> </dependency> <!--Paging plug-in--> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> <version>1.4.1</version> </dependency> <!-- Multiple data sources --> <dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId> <version>3.6.1</version> </dependency> <!--Enable elegant annotation verification--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> <!--apache-http request--> <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.14</version> </dependency> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency> <!--aliyun oss--> <dependency> <groupId>com.aliyun.oss</groupId> <artifactId>aliyun-sdk-oss</artifactId> <version>3.15.0</version> </dependency> <!--xxl-job--> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.4.0</version> </dependency> <!-- ShardingSphere read-write separation/sub-database and table --> <!-- <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId> <version>5.1.2</version> </dependency>--> <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <!--Do not use the following when running, it will report an error that the class cannot be found --> <!--<scope>provided</scope>--> </dependency> <!--Starting from version flink1.11.0, you need to introduce an additional flink-client package. Without this dependency, an error will be reported No ExecutorFactory found to execute the application--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!-- flink CDC --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> <!-- The 8.0.25 introduced by the mysql jdbc driver needs to be removed. Version 8.0.28 and above must be introduced. Otherwise, an error will be reported when starting the project: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;--> <exclusions> <exclusion> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </exclusion> </exclusions> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc --> <!--<dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency>--> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.12.1</version> </dependency>
3) Project pitfalls
- The mysql jdbc driver package version is too low, causing java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String; when the project is started. The method cannot be found and must be introduced. Version 8.0.28 and above
- The 2.3.0 version dependency of flink-connector-mysql-cdc introduces 8.0.25, which needs to be removed.
<!--Database driver--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <!--There is no com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset method in 8.0.25--> <version>8.0.29</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> <!-- The 8.0.25 introduced by the mysql jdbc driver needs to be removed. Version 8.0.28 and above must be introduced. Otherwise, an error will be reported when starting the project: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getStaticJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;--> <exclusions> <exclusion> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </exclusion> </exclusions>
- Error when starting the project: java.lang.NoClassDefFoundError: org/apache/flink/table/api/ValidationException. This class cannot be found because of the lack of dependency packages. Introduce relevant dependency packages.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.12.1</version> </dependency>
- The version of Flink must be compatible with the version of Flink CDC, and should be introduced according to the official version.
If the version is too low, it will cause: Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation,
To resolve this issue, you can follow these steps:
Make sure the database user you use has RELOAD permissions. Please log in to the MySQL database and grant RELOAD permission to the user. For example, use the following command to grant RELOAD permissions to user your_user:
GRANT RELOAD ON *.* TO 'your_user'@'localhost';
Flink officially solves the above problems
MySQL CDC source uses an incremental snapshot algorithm to avoid the use of database locks, so it does not require “RELOAD” permission.
Starting from Flink version 1.12, Flink introduces integration and support for MySQL CDC. In this version, Flink provides the flink-connector-mysql-cdc module to implement the MySQL-based Change Data Capture function.
In Flink version 1.12, the MySQL CDC source uses an incremental snapshot algorithm to capture data changes and does not require RELOAD permission. This implementation avoids the use of database locks and provides low-latency data change capture capabilities.
Set can.incremental.snapshot.enabled in the code to turn it on. For detailed code, see the code example.
Configuration config = new Configuration(); //Set incremental snapshot enable to true config.setBoolean("scan.incremental.snapshot.enabled", true); env.configure(config);
3. springboot code example
1) Create a change listener
Create the MysqlEventListener class to implement ApplicationRunner, and you can start mysql listening when the project starts.
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; /** * @Description: mysql change listener * @Date: 2023/10/11 **/ public class MysqlEventListener implements ApplicationRunner {<!-- --> @Override public void run(ApplicationArguments args) throws Exception {<!-- --> MySqlSource<DataChangeInfo> mySqlSource = MySqlSource.<DataChangeInfo>builder() .hostname("yourHostname") .port(3306) .databaseList("yourDatabaseName") //Set the captured database. If you need to synchronize the entire database, please set tableList to ".*". .tableList("yourDatabaseName.yourTableName") //Set the captured table, database.table name .username("yourUsername") .password("yourPassword") .deserializer(new MysqlDeserialization()) // Convert SourceRecord to custom object /**Initial initialization snapshot, that is, incremental import after full import (detection of update data writing) * latest: Only incremental import (does not read historical changes) * timestamp: Specify the timestamp for data import (error reading data greater than or equal to the specified time) */ .startupOptions(StartupOptions.latest()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration config = new Configuration(); //Set incremental snapshot enable to true config.setBoolean("scan.incremental.snapshot.enabled", true); env.configure(config); env.setParallelism(1); // DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource(); DataStreamSource<DataChangeInfo> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") .setParallelism(1); streamSource.addSink(new DataChangeSink()); env.execute("mysql-stream-cdc"); }; }
2) Custom data parser
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; import java.util.Optional; /** * @Description: mysql custom serialization * @Date: 2023/10/11 **/ public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {<!-- --> public static final String TS_MS = "ts_ms"; public static final String BIN_FILE = "file"; public static final String POS = "pos"; public static final String CREATE = "CREATE"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String UPDATE = "UPDATE"; /** * Deserialize data and convert it into a custom object DataChangeInfo * @param sourceRecord * @param collector * @throwsException */ @Override public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {<!-- --> String topic = sourceRecord.topic(); String[] fields = topic.split("\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); //5. Get the operation type CREATE UPDATE DELETE Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(eventType); dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse("")); dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0)); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis)); //7.Output data collector.collect(dataChangeInfo); } /** * Get the data before or after the change from metadata * @param value * @param fieldElement * @return */ private JSONObject getJsonObject(Struct value, String fieldElement) {<!-- --> Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) {<!-- --> Schema afterSchema = element.schema(); List<Field> fieldList = afterSchema.fields(); for (Field field : fieldList) {<!-- --> Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation<DataChangeInfo> getProducedType() {<!-- --> return TypeInformation.of(DataChangeInfo.class); } }
3) Create a change object
import lombok.Data; /** * @Description: Data change object * @Date: 2023/10/11 **/ @Data public class DataChangeInfo {<!-- --> /** *Data before change */ private String beforeData; /** *Data after change */ private String afterData; /** * Change type 1 Add 2 Modify 3 Delete */ private Integer eventType; /** * binlog file name */ private String fileName; /** * binlog current reading point */ private Integer filePos; /** * Database name */ private String database; /** * Table Name */ private String tableName; /** *Change time */ private Long changeTime; }
4) Create business processing class
import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.SinkFunction; /** * @Description: Data processing * @Date: 2023/10/11 **/ @Slf4j public class DataChangeSink implements SinkFunction<String> {<!-- --> @Override public void invoke(String value, Context context) throws Exception {<!-- --> log.info("Received change original data:{}", value); //Business code } }
5) Run code to monitor mysql CDC events
Project started successfully
- Modify mysql database data
before change
After changes: Click Save
Service monitoring results