Use lightweight CDC debezium-server-databend to build real-time data synchronization

Author: Han Shanjie

Databend Cloud R&D Engineer

hantmac (Jeremy) GitHub

Debezium Server Databend is a self-developed lightweight CDC project based on Debezium Engine, which is used to capture database changes in real time and deliver them as event streams, and finally write the data to the target database Databend. It provides an easy way to monitor and capture changes in relational databases, and supports converting these changes into consumable events.

Using Debezium server databend to implement CDC does not need to rely on large-scale Data Infra such as Flink, Kafka, Spark, etc. It only needs a startup script to start real-time data synchronization.

This tutorial will show how to quickly build real-time data synchronization from MySQL to Databend based on Debezium server databend.

Suppose we have an e-commerce business, and the product data is stored in MySQL, and we need to synchronize it to Databend in real time.

The following content will introduce how to use Debezium server databend CDC to achieve this requirement. The overall architecture of the system is shown in the figure below:

Preparation Phase

Prepare a Linux or MacOS with Docker, docker-compose and Java 11 environment installed.

Components required to prepare the tutorial

The following tutorials will prepare the required components in the form of docker-compose.

debezium-MySQL

docker-compose.yaml

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      -POSTGRES_DB=postgres
      -POSTGRES_USER=postgres
      -POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      -MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

Debezium Server Databend

  • Clone project: git clone ``https://github.com/databendcloud/debezium-server-databend.git

  • From the project root directory:

    • Build and package debezium server: mvn -Passembly -Dmaven.test.skip package
    • After the build is complete, unzip the server distribution package: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
    • Enter the unzipped folder: cd databendDist
    • Create the application.properties file and modify: nano conf/application.properties, copy the following application.properties into it, and modify the corresponding configuration according to the actual situation of the user.
    • Run the service using the provided script: bash run.sh
    • Debezium Server with Databend will start

At the same time, we also provide the corresponding Docker image, which can be started in the container with one click:

version: '2.1'
services:
  debezium:
    image: ghcr.io/databendcloud/debezium-server-databend:pr-2
    ports:
      - "8080:8080"
      - "8083:8083"
    volumes:
      - $PWD/conf:/app/conf
      - $PWD/data:/app/data

NOTE: When starting in a container pay attention to the network of the connected database.

Debezium Server Databend Application Properties

This article uses the configuration provided below. For more parameter descriptions and configurations, please refer to the documentation.

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
#debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############# SET LOG LEVELS #############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

Prepare Data

Prepare data in MySQL Prepare data in database

Enter the MySQL container

docker-compose exec mysql mysql -uroot -p123456

Create database mydb and table products, and insert data:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistant black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

Create Database in Databend

NOTE: Users do not need to create tables in Databend first, the system will automatically create tables for users after detection.

Start Debezium Server Databend

bash run.sh

The first startup will enter the init snapshot mode, and the data in MySQL will be fully synchronized to Databend through the configured Batch Size, so you can see in Databend that the data in MySQL has been synchronized:

Sync Insert data

We continue to insert 5 pieces of data into MySQL:

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");

Debezium server databend log:

At the same time, it can be found in Databend that 5 pieces of data have been synchronized:

Sync Update data

debezium.sink.databend.upsert=true in the configuration file, so we can also handle Update/Delete events.

Update data with id=10 in MySQL:

update products set name="from debezium" where id=10;

In Databend, it can be found that the data with id 10 has been updated:

sync delete data

In the configuration file, there are the following configurations, which can enable the ability to handle Delete events:

debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

Debezim Server’s processing of Delete is relatively complicated, and two event records will be generated under the DELETE operation:

  1. One contains “op”: “d”, the other row data and fields;
  2. A tombstones record with the same key as the deleted row, but with a value of null.

These two events will be issued at the same time. In Debezium Server Databend, we choose to implement soft deletion on Delete data, which requires us to have a __deleted field in the target table. When the Delete event comes, we will set the Inserted into the target table after the field is set to TRUE.

The advantage of this design is that some users want to keep the data, but may think of deleting it in the future, which provides users with an optional solution. When they want to delete the data in the future, they only need to delete from table where __deleted=true is enough.

For Debezium’s description of deletion events and how to handle them, please refer to the documentation for details.

Delete the data with id=12 in MySQL:

delete from products where id=12;

In Databend, it can be observed that the __deleted field of the value of id=12 has been set to true.

Environmental Cleanup

After the operation, execute the following command in the directory where the docker-compose.yml file is located to stop all containers:

docker-compose down

Conclusion

The above is the whole process of building real-time data synchronization from MySQL to Databend based on the lightweight CDC debezium server databend. This method does not need to rely on large components such as Flink and Kafka, and it is very convenient to start and manage.