Author: Han Shanjie
Databend Cloud R&D Engineer
hantmac (Jeremy) GitHub
Debezium Server Databend is a self-developed lightweight CDC project based on Debezium Engine, which is used to capture database changes in real time and deliver them as event streams, and finally write the data to the target database Databend. It provides an easy way to monitor and capture changes in relational databases, and supports converting these changes into consumable events.
Using Debezium server databend to implement CDC does not need to rely on large-scale Data Infra such as Flink, Kafka, Spark, etc. It only needs a startup script to start real-time data synchronization.
This tutorial will show how to quickly build real-time data synchronization from MySQL to Databend based on Debezium server databend.
Suppose we have an e-commerce business, and the product data is stored in MySQL, and we need to synchronize it to Databend in real time.
The following content will introduce how to use Debezium server databend CDC to achieve this requirement. The overall architecture of the system is shown in the figure below:
Preparation Phase
Prepare a Linux or MacOS with Docker, docker-compose and Java 11 environment installed.
Components required to prepare the tutorial
The following tutorials will prepare the required components in the form of docker-compose
.
debezium-MySQL
docker-compose.yaml
version: '2.1' services: postgres: image: debezium/example-postgres:1.1 ports: - "5432:5432" environment: -POSTGRES_DB=postgres -POSTGRES_USER=postgres -POSTGRES_PASSWORD=postgres mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 -MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw
Debezium Server Databend
-
Clone project:
git clone ``https://github.com/databendcloud/debezium-server-databend.git
-
From the project root directory:
- Build and package debezium server:
mvn -Passembly -Dmaven.test.skip package
- After the build is complete, unzip the server distribution package:
unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
- Enter the unzipped folder:
cd databendDist
- Create the
application.properties
file and modify:nano conf/application.properties
, copy the following application.properties into it, and modify the corresponding configuration according to the actual situation of the user. - Run the service using the provided script:
bash run.sh
- Debezium Server with Databend will start
- Build and package debezium server:
At the same time, we also provide the corresponding Docker image, which can be started in the container with one click:
version: '2.1' services: debezium: image: ghcr.io/databendcloud/debezium-server-databend:pr-2 ports: - "8080:8080" - "8083:8083" volumes: - $PWD/conf:/app/conf - $PWD/data:/app/data
NOTE: When starting in a container pay attention to the network of the connected database.
Debezium Server Databend Application Properties
This article uses the configuration provided below. For more parameter descriptions and configurations, please refer to the documentation.
debezium.sink.type=databend debezium.sink.databend.upsert=true debezium.sink.databend.upsert-keep-deletes=false debezium.sink.databend.database.databaseName=debezium debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443 debezium.sink.databend.database.username=cloudapp debezium.sink.databend.database.password=password debezium.sink.databend.database.primaryKey=id debezium.sink.databend.database.tableName=products debezium.sink.databend.database.param.ssl=true # enable event schemas debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true debezium.format.value=json debezium.format.key=json # mysql source debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=60000 debezium.source.database.hostname=127.0.0.1 debezium.source.database.port=3306 debezium.source.database.user=root debezium.source.database.password=123456 debezium.source.database.dbname=mydb debezium.source.database.server.name=from_mysql debezium.source.include.schema.changes=false debezium.source.table.include.list=mydb.products #debezium.source.database.ssl.mode=required # Run without Kafka, use local file to store checkpoints debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory debezium.source.database.history.file.filename=data/status.dat # do event flattening. unwrap message! debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.delete.handling.mode=rewrite debezium.transforms.unwrap.drop.tombstones=true # ############# SET LOG LEVELS ############# quarkus.log.level=INFO # Ignore messages below warning level from Jetty, because it's a bit verbose quarkus.log.category."org.eclipse.jetty".level=WARN
Prepare Data
Prepare data in MySQL Prepare data in database
Enter the MySQL container
docker-compose exec mysql mysql -uroot -p123456
Create database mydb and table products
, and insert data:
CREATE DATABASE mydb; USE mydb; CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)); ALTER TABLE products AUTO_INCREMENT = 10; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistant black wind breaker"), (default,"cloud","test for databend"), (default,"spare tire","24 inch spare tire");
Create Database in Databend
NOTE: Users do not need to create tables in Databend first, the system will automatically create tables for users after detection.
Start Debezium Server Databend
bash run.sh
The first startup will enter the init snapshot mode, and the data in MySQL will be fully synchronized to Databend through the configured Batch Size, so you can see in Databend that the data in MySQL has been synchronized:
Sync Insert data
We continue to insert 5 pieces of data into MySQL:
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer");
Debezium server databend log:
At the same time, it can be found in Databend that 5 pieces of data have been synchronized:
Sync Update data
debezium.sink.databend.upsert=true
in the configuration file, so we can also handle Update/Delete events.
Update data with id=10 in MySQL:
update products set name="from debezium" where id=10;
In Databend, it can be found that the data with id 10 has been updated:
sync delete data
In the configuration file, there are the following configurations, which can enable the ability to handle Delete events:
debezium.sink.databend.upsert-keep-deletes=false debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.delete.handling.mode=rewrite debezium.transforms.unwrap.drop.tombstones=true
Debezim Server’s processing of Delete is relatively complicated, and two event records will be generated under the DELETE operation:
- One contains “op”: “d”, the other row data and fields;
- A tombstones record with the same key as the deleted row, but with a value of null.
These two events will be issued at the same time. In Debezium Server Databend, we choose to implement soft deletion on Delete data, which requires us to have a __deleted
field in the target table. When the Delete event comes, we will set the Inserted into the target table after the field is set to TRUE.
The advantage of this design is that some users want to keep the data, but may think of deleting it in the future, which provides users with an optional solution. When they want to delete the data in the future, they only need to delete from table where __deleted=true
is enough.
For Debezium’s description of deletion events and how to handle them, please refer to the documentation for details.
Delete the data with id=12 in MySQL:
delete from products where id=12;
In Databend, it can be observed that the __deleted
field of the value of id=12 has been set to true
.
Environmental Cleanup
After the operation, execute the following command in the directory where the docker-compose.yml
file is located to stop all containers:
docker-compose down
Conclusion
The above is the whole process of building real-time data synchronization from MySQL to Databend based on the lightweight CDC debezium server databend. This method does not need to rely on large components such as Flink and Kafka, and it is very convenient to start and manage.