In the era of big data, data migration and flow have become increasingly important. To enable data to flow more efficiently from one source to another, we need tools that are reliable, efficient, and easy to configure. Today, we will introduce the JDBC SQL Server Sink Connector, a connector designed specifically for SQL Server to ensure accurate and efficient data transmission.
Not only that, it also supports multiple stream processing engines such as Spark, Flink, and SeatTunnel Zeta. Whether you are a beginner or an experienced developer, this article will provide you with insights on how to get the most out of this connector.
Support SQL Server version
- Server: 2008 (or higher, informational only)
Supported Engine
Spark
Flink
Seatunnel Zeta
Main Features
- [x] Accurate one-time use
- [x] CDC (Change Data Capture)
Use
Xa transactions
to ensureexact one-time use
. Therefore,exact one-shot
is only supported for databases that supportXa transactions
. You can enable it by settingis_exactly_once=true
.
Description
Write data via JDBC. Supports batch processing mode and stream processing mode, supports concurrent writing, and supports precise one-time semantics (using XA transaction guarantee).
Supported data source information
Data source | Supported versions | Driver | URL | Maven |
---|---|---|---|---|
SQL Server | Supported version>= 2008 | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | Download |
Database dependency
Please download the support matrix corresponding to ‘Maven’ and copy it to the ‘$SEATNUNNEL_HOME/plugins/jdbc/lib/’ working directory
For example, SQL Server data source: cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
Data type mapping
SQL Server data type | Seatunnel data type |
---|---|
BIT | BOOLEAN |
TINYINT SMALLINT |
SHORT |
INTEGER | INT |
BIGINT | LONG |
DECIMAL NUMERIC MONEY SMALLMONEY |
DECIMAL((specified column size of the specified column) + 1, (Get the number of digits to the right of the decimal point in the specified column.))) |
REAL | FLOAT |
FLOAT | DOUBLE |
CHAR NCHAR VARCHAR NTEXT NVARCHAR TEXT |
STRING |
DATE | LOCAL_DATE |
TIME | LOCAL_TIME |
DATETIME DATETIME2 SMALLDATETIME DATETIMEOFFSET |
LOCAL_DATE_TIME |
TIMESTAMP BINARY VARBINARY IMAGE UNKNOWN |
Not supported yet |
Sink option
Name | Type | Required | Default value | Description |
---|---|---|---|---|
url | String | is | – | The URL of the JDBC connection. For example: jdbc:sqlserver://localhost:1433;databaseName=mydatabase |
driver | String | Yes | – | The JDBC class name used to connect to the remote data source, or if using SQL Server, the value is com.microsoft.sqlserver.jdbc.SQLServerDriver . |
user | String | No | – | Connection instance Username |
password | String | No | – | Connect Instance password |
query | String | No | – | Use this SQL to write upstream input data to the database. For example, INSERT... , query has higher priority |
database | characters String | No | – | Use this database and table-name to automatically generate and receive SQL Upstream input data is written to the database. This option is mutually exclusive with query and takes precedence. |
table | String | No | – | Use database and This table name automatically generates SQL and receives upstream input data and writes it to the database. This option is mutually exclusive with query and takes precedence. |
primary_keys | array | No | – | This option is used Supports operations such as insert , delete and update when automatically generating SQL. |
support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose whether to use INSERT SQL, UPDATE SQL to handle update events (INSERT, UPDATE_AFTER) based on querying whether the primary key exists. Use this configuration only if the database does not support upsert syntax. Note: This method has lower performance. |
connection_check_timeout_sec | Integer | No | 30 | Waiting for verification The number of seconds that the connected database operation took to complete. |
max_retries | Integer | No | 0 | Retry submission failed (executeBatch) times. |
batch_size | Integer | No | 1000 | For batch writing When the number of records entered reaches batch_size or the time reaches checkpoint.interval , the data will be flushed to the database. |
is_exactly_once | Boolean | No | false | Whether to enable exact once Sexual semantics, XA transactions will be used. If enabled, xa_data_source_class_name needs to be set. |
generate_sink_sql | Boolean | No | false | Based on writing Generate SQL statements for database tables. |
xa_data_source_class_name | String | No | – | Database driver The XA data source class name, for example, SQL Server is com.microsoft.sqlserver.jdbc.SQLServerXADataSource . For other data sources, please refer to the appendix. |
max_commit_attempts | Integer | No | 3 | Transaction submission failed number of retries. |
transaction_timeout_sec | Integer | No | -1 | After the transaction is opened The timeout period, the default value is -1 (never timeout). Note that setting a timeout may affect exactly-once semantics. |
auto_commit | Boolean | No | true | Automatic transactions are enabled by default submit. |
common-options | No | – | Sink plug-in common parameters, please refer to Sink Common Options for details. | |
## Tips |
If
partition_column
is not set, it will run in single concurrency; ifpartition_column
is set, parallel operations will be performed based on the concurrency of the task.
Task Example
Simple:
Here is an example of reading Sql Server data and inserting it directly into another table
env { # You can set engine configuration here execution.parallelism = 10 } source { # This is a sample source plug-in, only used to test and demonstrate the functionality of the source plug-in Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user=SA password = "Y.sa123456" query = "select * from column_type_test.dbo.full_types_jdbc" # Read fields in parallel shards partition_column = "id" #Number of fragments partition_num = 10 } } transform { # If you want more information on how to configure Seatunnel and see the full list of conversion plugins, # Please go to https://seatunnel.apache.org/docs/transform-v2/sql } sink { Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user=SA password = "Y.sa123456" query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" } # If you want more information on how to configure Seatunnel and see the full list of receiving plugins, # Please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc }
CDC (Change Data Capture) event
We also support CDC change data, which requires configuring the database, tables, and primary keys.
Jdbc { source_table_name = "customers" driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user=SA password = "Y.sa123456" generate_sink_sql = true database = "column_type_test" table = "dbo.full_types_sink" batch_size = 100 primary_keys = ["id"] }
Exact one-time Sink
Transactional writes may be slower but more accurate to the data
Jdbc { driver = com.microsoft.sqlserver.jdbc.SQLServerDriver url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" user=SA password = "Y.sa123456" query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" is_exactly_once = "true" xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource" } # If you want to get more information on how to configure Seatunnel and see the complete list of receiving plugins, # Please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
This article is published by Beluga Open Source Technology!