Flink CDC-SQL Server CDC configuration and DataStream API implementation code… can monitor and collect multiple tables of a database

Article directory

  • SQL Server CDC configuration
    • Step 1: Enable the CDC function of the specified database
    • Step 2: Create a database role
    • Step 3: Create file groups & files
    • Step 4: Enable the CDC function of the specified table
  • SQLServer CDC DataStream API implementation
    • 1. Define SqlServerSource
    • 2. Data processing
    • 3. Sink to MySQL
  • refer to

SQL Server CDC Configuration

Step 1: Enable the CDC function of the specified database

  • Check whether the CDC function has been enabled in SQL Server
    --returning 1 indicates that the CDC function has been enabled
    select is_cdc_enabled from db_name.sys.databases where name = 'db_name';
    
  • If the CDC function is not enabled, execute the following statements in order to turn on the CDC of the database
    ALTER AUTHORIZATION ON DATABASE::db_name TO sa;
    USE db_name;
    EXEC sys.sp_cdc_enable_db;
    

Step 2: Create database role

  • View database roles
    select name from db_name.sys.database_principals
    where type_desc = 'DATABASE_ROLE' and name not like '##%'
    
  • Create a new database role
    USE db_name;
    create role cdc_role;
    grant select on schema::schema_name to cdc_role;
    
  • Check if the user has the db_owner role
    SELECT
    r.name AS RoleName, m.name AS MemberName
    FROM sys.database_role_members drm
    JOIN sys.database_principals r ON drm.role_principal_id = r.principal_id
    JOIN sys.database_principals m ON drm.member_principal_id = m.principal_id
    --WHERE m.name = 'sa' AND r.name = 'db_owner';
    

Step 3: Create file groups & files

alter database db_name add filegroup cdc_filegroup;
alter database db_name add file (
name = 'cdc_data',
filename = 'D:\RDSDBDATA\DATA\cdc_data.mdf', -- need to check the actual data path of SQLServer
size=16MB
)
to filegroup cdc_filegroup;

-- select data_space_id,name,type_desc,t1.is_read_only,t1.type,t1.log_filegroup_id from db_name.sys.filegroups t1;
-- select file_id,name,physical_name,type_desc,data_space_id from db_name.sys.database_files;

Step 4: Enable the CDC function of the specified table

  1. Enable CDC on the table to be collected

    1. Check whether CDC is enabled in the table
      select is_tracked_by_cdc from db_name.sys.tables where name = 'table_name';
      
    2. Table open CDC
      USE db_name;
      EXEC sys.sp_cdc_enable_table
      @source_schema = N'schema_name',
      @source_name = N'table_name',
      @role_name = N'cdc_role_name',
      @filegroup_name = N'cdc_filegroup',
      @supports_net_changes = 0
      
  2. When opening table CDC, the following error may appear:

    SQL error [22832] [S0001]: Unable to update metadata to indicate that change data capture is enabled for table [schema_name].[table_name].
    Failed while executing command '[sys].[sp_cdc_add_job] @job_type = N'capture''.
    The error returned was 22836: 'Unable to update metadata for database db_name to indicate that a change data capture job has been added.
    Failed while executing command 'sp_add_jobstep_internal'. The error returned is 14234: The specified '@server' is invalid (valid values are returned by sp_helpserver).
    '. Please use this action and error to determine the cause of the failure and resubmit the request. '. Please use this action and error to determine the cause of the failure and resubmit the request.
    

    reason:

    -- Check the current server name.
    select @@servername;
    
    -- Check instance name
    SELECT SERVERPROPERTY('ServerName');
    
    -- The SQLServer installer sets the server name to the computer name during installation.
    -- The ServerName property of the SERVERPROPERTY function and @@SERVERNAME return similar information.
    -- 1. The ServerName attribute provides the Windows server and instance name, which together form a unique server instance. (If the server's network name changes, this value returns the new name)
    -- 2. @@SERVERNAME provides the currently configured local server name. (Represents the server and instance names specified when installing or setting up a SQL Server instance. This value will not automatically update if subsequent changes are made to the server's network name)
    -- If the default server name was not changed during installation, the ServerName property and @@SERVERNAME return the same information. You can configure the name of the local server by performing the following procedure:
    EXEC sp_dropserver 'current_server_name';
    EXEC sp_addserver 'new_server_name', 'local';
    

    View the SQLServer CDC automatic cleanup cycle

    EXEC sys.sp_cdc_help_jobs;
    
  3. Verify that the user has permission to access the CDC table

    USE db_name;
    EXEC sys.sp_cdc_help_change_data_capture;
    

SQLServer CDC DataStream API implementation

Version of software used

  • java 1.8
  • Scala 2.11
  • Flink 1.14.2
  • Flink CDC 2.3.0
  • Source SQLServer2016
  • Sink MySQL 5.7
  • jackson 2.10.2

The SQLServer CDC DataStream API enables one job to monitor and collect multiple tables of a database.

1. Define SqlServerSource

//Source database connection configuration file
Properties dbProps = DbConfigUtil.loadConfig("sqlserver.properties");

//Debezium configuration
Properties debeziumProps = new Properties();
//decimal.handling.mode specifies how the connector handles the values of DECIMAL and NUMERIC columns. There are three modes: precise, double and string.
//precise (default value): Precisely represent them in change events in binary form, using java.math.BigDecimal values (this mode collection will convert DECIMAL and NUMERIC columns into binary format, which is not easy to read and inconvenient for data deal with)
//Represent them as double values, which may cause the value precision to be lost
//string: Encodes the value into a formatted string, easy for downstream consumption, but loses semantic information about the actual type. (It is recommended to use this mode to facilitate downstream data processing)
debeziumProps.setProperty("decimal.handling.mode", "string");
//Time, date and timestamps can be represented with different precisions, including:
//adaptive_time_microseconds (default): Captures date, datetime, and timestamp values accurately, using millisecond, microsecond, or nanosecond precision values, depending on the type of database column, except for TIME type fields, which are always expressed in microseconds .
//adaptive (deprecated): Accurately captures time and timestamp values based on database column types, using millisecond, microsecond, or nanosecond precision values.
//connect: Always represent time and timestamp values using Kafka Connect's built-in Time, Date, and Timestamp notations, using millisecond precision regardless of the precision of the database column.
debeziumProps.setProperty("time.precision.mode", "connect");

//SQLServer CDC data source
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
        .hostname(dbProps.getProperty("host"))
        .port(Integer.parseInt(dbProps.getProperty("port")))
        .database(dbProps.getProperty("database"))
        .tableList(dbProps.getProperty("table_list").split(","))
        .username(dbProps.getProperty("username"))
        .password(dbProps.getProperty("password"))
        .debeziumProperties(debeziumProps)
        .deserializer(new JsonDebeziumDeserializationSchema())
        .startupOptions(StartupOptions.initial())
        .build();

2. Data processing

Reference: MySQL CDC configuration and DataStream API implementation code

3. Sink to MySQL

Reference: MySQL CDC configuration and DataStream API implementation code

Reference

  1. https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html
  2. https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/sqlserver-cdc.html
  3. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
  4. https://learn.microsoft.com/zh-cn/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-2016