A simple introduction to Apache SeaTunnel SQL Server Sink Connector

In the era of big data, data migration and flow have become increasingly important. To enable data to flow more efficiently from one source to another, we need tools that are reliable, efficient, and easy to configure. Today, we will introduce the JDBC SQL Server Sink Connector, a connector designed specifically for SQL Server to ensure accurate and efficient data transmission.

file

Not only that, it also supports multiple stream processing engines such as Spark, Flink, and SeatTunnel Zeta. Whether you are a beginner or an experienced developer, this article will provide you with insights on how to get the most out of this connector.

Support SQL Server version

  • Server: 2008 (or higher, informational only)

Supported Engine

Spark
Flink
Seatunnel Zeta

Main Features

  • [x] Accurate one-time use
  • [x] CDC (Change Data Capture)

Use Xa transactions to ensure exact one-time use. Therefore, exact one-shot is only supported for databases that support Xa transactions. You can enable it by setting is_exactly_once=true.

Description

Write data via JDBC. Supports batch processing mode and stream processing mode, supports concurrent writing, and supports precise one-time semantics (using XA transaction guarantee).

Supported data source information

Data source Supported versions Driver URL Maven
SQL Server Supported version>= 2008 com.microsoft.sqlserver.jdbc.SQLServerDriver jdbc:sqlserver://localhost:1433 Download

Database dependency

Please download the support matrix corresponding to ‘Maven’ and copy it to the ‘$SEATNUNNEL_HOME/plugins/jdbc/lib/’ working directory
For example, SQL Server data source: cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

Data type mapping

SQL Server data type Seatunnel data type
BIT BOOLEAN
TINYINT
SMALLINT
SHORT
INTEGER INT
BIGINT LONG
DECIMAL
NUMERIC
MONEY
SMALLMONEY
DECIMAL((specified column size of the specified column) + 1,
(Get the number of digits to the right of the decimal point in the specified column.)))
REAL FLOAT
FLOAT DOUBLE
CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXT
STRING
DATE LOCAL_DATE
TIME LOCAL_TIME
DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSET
LOCAL_DATE_TIME
TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN
Not supported yet

Sink option

Name Type Required Default value Description
url String is The URL of the JDBC connection. For example: jdbc:sqlserver://localhost:1433;databaseName=mydatabase
driver String Yes The JDBC class name used to connect to the remote data source, or if using SQL Server, the value is com.microsoft.sqlserver.jdbc.SQLServerDriver.
user String No Connection instance Username
password String No Connect Instance password
query String No Use this SQL to write upstream input data to the database. For example, INSERT..., query has higher priority
database characters String No Use this database and table-name to automatically generate and receive SQL Upstream input data is written to the database. This option is mutually exclusive with query and takes precedence.
table String No Use database and This table name automatically generates SQL and receives upstream input data and writes it to the database. This option is mutually exclusive with query and takes precedence.
primary_keys array No This option is used Supports operations such as insert, delete and update when automatically generating SQL.
support_upsert_by_query_primary_key_exist Boolean No false Choose whether to use INSERT SQL, UPDATE SQL to handle update events (INSERT, UPDATE_AFTER) based on querying whether the primary key exists. Use this configuration only if the database does not support upsert syntax. Note: This method has lower performance.
connection_check_timeout_sec Integer No 30 Waiting for verification The number of seconds that the connected database operation took to complete.
max_retries Integer No 0 Retry submission failed (executeBatch) times.
batch_size Integer No 1000 For batch writing When the number of records entered reaches batch_size or the time reaches checkpoint.interval, the data will be flushed to the database.
is_exactly_once Boolean No false Whether to enable exact once Sexual semantics, XA transactions will be used. If enabled, xa_data_source_class_name needs to be set.
generate_sink_sql Boolean No false Based on writing Generate SQL statements for database tables.
xa_data_source_class_name String No Database driver The XA data source class name, for example, SQL Server is com.microsoft.sqlserver.jdbc.SQLServerXADataSource. For other data sources, please refer to the appendix.
max_commit_attempts Integer No 3 Transaction submission failed number of retries.
transaction_timeout_sec Integer No -1 After the transaction is opened The timeout period, the default value is -1 (never timeout). Note that setting a timeout may affect exactly-once semantics.
auto_commit Boolean No true Automatic transactions are enabled by default submit.
common-options No Sink plug-in common parameters, please refer to Sink Common Options for details.
## Tips

If partition_column is not set, it will run in single concurrency; if partition_column is set, parallel operations will be performed based on the concurrency of the task.

Task Example

Simple:

Here is an example of reading Sql Server data and inserting it directly into another table

env {
  # You can set engine configuration here
  execution.parallelism = 10
}
source {
  # This is a sample source plug-in, only used to test and demonstrate the functionality of the source plug-in
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user=SA
    password = "Y.sa123456"
    query = "select * from column_type_test.dbo.full_types_jdbc"
    # Read fields in parallel shards
    partition_column = "id"
    #Number of fragments
    partition_num = 10
  }
}

transform {

  # If you want more information on how to configure Seatunnel and see the full list of conversion plugins,
  # Please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user=SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"

  } # If you want more information on how to configure Seatunnel and see the full list of receiving plugins,
  # Please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

CDC (Change Data Capture) event

We also support CDC change data, which requires configuring the database, tables, and primary keys.

Jdbc {
  source_table_name = "customers"
  driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
  url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  user=SA
  password = "Y.sa123456"
  generate_sink_sql = true
  database = "column_type_test"
  table = "dbo.full_types_sink"
  batch_size = 100
  primary_keys = ["id"]
}

Exact one-time Sink

Transactional writes may be slower but more accurate to the data

  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user=SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
    is_exactly_once = "true"

    xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource"

  } # If you want to get more information on how to configure Seatunnel and see the complete list of receiving plugins,
  # Please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc

This article is published by Beluga Open Source Technology!