Data synchronization between Canal and MariaDB

Data synchronization between Canal, MariaDB and kafka

1. Architecture

1. Server node

service name IP Role
MariaDB 192.168.31.102 Master node
Canal 192.168 .31.101 From node

2.canal introduction

Alt

canal open source address:

canal [k?’n?l], translated as waterway/pipe/ditch, the main purpose is to provide incremental data subscription and consumption services based on MySQL database incremental log analysis, including:

  • Database mirroring, database real-time backup

  • Index construction and real-time maintenance (split heterogeneous index, inverted index, etc.)

  • Business cache refresh, incremental data processing with business logic

(1) Working principle

  • canal simulates the interactive protocol of MySQL slave, pretends to be MySQL slave, and sends dump protocol to MySQL master
  • MySQL master receives dump request and starts to push binary log to slave (ie canal)
  • canal parse binary log object (originally byte stream)

(2) Advantages and disadvantages

Advantages: Good real-time performance, distributed, ACK mechanism

Cons:

  • Only incremental synchronization is supported, full synchronization is not supported

  • MySQl–>ES, RDB, supported data sources are limited

  • An instance can only be consumed by one consumer

  • single point pressure

2. MariaDB master node

1. MariaDB installation

Tutorial: Install MariaDB-10.8 on CentOS-7

2. Configuration file /etc/my.cnf

For MariaDB Primary Node , you need to enable the Binlog write function first, configure binlog-format to ROW mode, and configure it in my.cnf as follows:

[mysqld]
log-bin=didiok-mysql-bin # open binlog
binlog-format=ROW # select ROW mode
server_id=102 # Guaranteed to be unique and cannot be duplicated with the slaveId in canal

3. Restart the service

restart:

systemctl restart mariadb
mysql -uroot -proot

Check the open status of log_bin:

mariadb> show variables like '%log_bin%';

Storage location of binlog logs: /var/lib/mysql

4. Create an account for backup on the MySQL master node

After restarting MariaDB, enter MariaDB and create a new account for data synchronization:

#A user named canal is created at this time, the password is 123456, and % means that any address can be used for remote login.
mariadb> CREATE USER 'canal'@'%' IDENTIFIED BY '123456';
#Give the canal user authorization for synchronous replication, etc. (REPLICATION SLAVE)
mariadb> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mariadb> flush privileges;

-- Authorize the canal link MySQL account has the permission to be a MySQL slave, if you already have an account, you can directly grant
-- CREATE USER canal IDENTIFIED BY 'canal';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' WITH GRANT OPTION;
-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

3. Canal slave node

1. Download

Download address: Canal download

There are four installation packages as follows, only download canal.deployer-1.1.7-SNAPSHOT.tar.gz here

  • canal.adapter-1.1.7-SNAPSHOT.tar.gz: Synchronized adapter. The client equivalent to canal will obtain data from canal-server (need to be configured in tcp mode), and then synchronize the data, which can be synchronized to storage such as MySQL, Elasticsearch and HBase. Compared with the canal.serverMode that comes with canal-server, the downstream data provided by canal-adapter is more widely accepted.

  • canal.admin-1.1.7-SNAPSHOT.tar.gz: admin console. Provide canal with overall configuration management, node operation and maintenance and other operation and maintenance-oriented functions, and provide a relatively friendly WebUI operation interface, which is convenient for more users to operate quickly and safely.

  • canal.deployer-1.1.7-SNAPSHOT.tar.gz : canal service. You can directly monitor MySQL’s binlog, disguise yourself as a MySQL slave library, and only receive data without processing it. After receiving the binlog data of MySQL, it can be sent to the corresponding downstream by configuring canal.serverMode: tcp, kafka, rocketMQ, rabbitMQ connection. Among them, the tcp method can customize the canal client to accept data, which is more flexible.

  • canal.example-1.1.7-SNAPSHOT.tar.gz: canal example

2. Install canal

Create folder and extract canal

mkdir /usr/local/canal
tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C /usr/local/canal/

3. Modify the configuration file

(1) Modify canal.properties

vim /usr/local/canal/conf/canal.properties

The main revisions are:

# java program connection port
canal.port = 11111

In addition, if the system is 1 cpu, you need to set canal.instance.parser.parallel to false

(2) Modify instance.properties

vim /usr/local/canal/conf/example/instance.properties

The main revisions are as follows:

## cannot be duplicated with the existing mariadb node server-id
canal.instance.mysql.slaveId=101
## address of mariadb master
canal.instance.master.address=192.168.31.102:3306

## Specify the user password to connect to mariadb
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
## character set
canal.instance.connectionCharset = UTF-8

4. Start canal

cd /usr/local/canal/bin
./startup.sh
# Check the running status
jps
## The shutdown command is ./stop.sh

5. Authentication Service

Whether the startup is successful, we can check the log data

## View server logs
cat /usr/local/canal/logs/canal/canal.log
## Or use the following command to view the log of the instance
tail -f -n 100 /usr/local/canal/logs/example/example.log

If you use tail -f -n 100 /usr/local/canal/logs/example/example.log , you will see the following error:

2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR com.taobao.tddl.dbsync.binlog.LogEvent - Query_log_event has unknown status vars (f first has code: 129), skipping the rest of them
2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR com.taobao.tddl.dbsync.binlog.LogEvent - Query_log_event has unknown status vars (first has code: 23 ), skipping the rest of them
2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] WARN com.taobao.tddl.dbsync.binlog.LogDecoder - Decoding Query failed from: didiok-mysql-bin. 000001:1469
java.io.IOException: Read Q_FLAGS2_CODE error: limit exceeded: 67
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:715) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.<init>(QueryLogEvent.java:495) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:168) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:111) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:179) [canal.parse-1.1.5.jar:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:276) [canal.parse-1.1.5.jar:na]
at java.lang.Thread.run(Thread.java:750) [na:1.8.0_361]
Caused by: java.lang.IllegalArgumentException: limit exceed: 67
at com.taobao.tddl.dbsync.binlog.LogBuffer.getUint32(LogBuffer.java:562) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:612) ~[canal.parse.dbsync-1.1.5.jar:na]
... 6 common frames omitted
2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /192.168.3 1.102:3306 has an error, retrying. caused by
java.io.IOException: Read Q_FLAGS2_CODE error: limit exceeded: 67
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:715) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.<init>(QueryLogEvent.java:495) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:168) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:111) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:179) ~[canal.parse-1.1.5.jar:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:276) ~[canal.parse-1.1.5.jar:na]
at java.lang.Thread.run(Thread.java:750) [na:1.8.0_361]
Caused by: java.lang.IllegalArgumentException: limit exceed: 67
at com.taobao.tddl.dbsync.binlog.LogBuffer.getUint32(LogBuffer.java:562) ~[canal.parse.dbsync-1.1.5.jar:na]
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:612) ~[canal.parse.dbsync-1.1.5.jar:na]
... 6 common frames omitted
2023-05-22 20:57:43.475 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[java.io.IOException : Read Q_FLAGS2_CODE error: limit exceed: 67
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:715)
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.<init>(QueryLogEvent.java:495)
at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:168)
at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:111)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:179)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:276)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: limit exceed: 67
at com.taobao.tddl.dbsync.binlog.LogBuffer.getUint32(LogBuffer.java:562)
at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:612)
... 6 more

Workaround:

Use the latest version of canal, the new version of canal has fixed this problem, I installed canal.deployer-1.1.7-SNAPSHOT.tar.gz and this problem did not occur!

4. Combine SpringBoot and Canal to achieve data synchronization

1. Add dependency in SpringBoot project

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.5</version>
</dependency>

2. Write java class:

public class CanalTest {<!-- -->

/**
* 1. Connect to the Canal server
* 2. Request the dump protocol from the Master
* 3. Parse the sent binlog
* 4. Finally, do the actual operation processing...send to MQ Print...
* @param args
*/
public static void main(String[] args) {<!-- -->
\t\t
CanalConnector connector = CanalConnectors. newSingleConnector(
new InetSocketAddress("192.168.31.101", 11111), // 192.168.31.101 is the ip of the node where the canal is located, and 11111 is the port number of the java connection to the canal
"example", "canal", "123456");
\t\t
int batchSize = 1000; // pull data volume
int emptyCount = 0;
try {<!-- -->
// connect to our canal server
connector. connect();
// What content to subscribe to? What library table content? ?
connector.subscribe(".*\..*"); // indicates to subscribe to all libraries and tables
// If there is a problem, perform a rollback operation directly
connector. rollback();
\t\t\t
int totalEmptyCount = 1200;
\t\t\t
while(emptyCount < totalEmptyCount) {<!-- -->
Message message = connector.getWithoutAck(batchSize); // Pull 1000 pieces of data at a time and encapsulate them into a message
// batchId is used for ACK submission after processing the data
long batchId = message. getId();
int size = message. getEntries(). size();

if(batchId == -1 || size == 0) {<!-- -->
// no data fetched
emptyCount++;
System.err.println("empty count: " + emptyCount);
try {<!-- -->
Thread. sleep(1000);
} catch (InterruptedException e) {<!-- -->
// ignore..
}
} else {<!-- -->
// have data
emptyCount = 0;
System.err.printf("message[batchId=%s, size=%s] \\
", batchId, size);
// process parsed data
printEnrty(message. getEntries());
}
// Confirm the submitted processed data
connector.ack(batchId);
}
System.err.println("empty too many times, exit");
\t\t\t
} finally {<!-- -->
// close the connection
connector. disconnect();
}
\t\t
}

private static void printEnrty(List<Entry> entries) {<!-- -->
for(Entry entry : entries) {<!-- -->
System.err.println("entry.getEntryType():" + entry.getEntryType());
// If EntryType is currently in the process of a transaction, it cannot be processed
if(entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {<!-- -->
continue;
}
\t\t\t
// rc contains a lot of information: storage database, table, binlog
RowChange rc = null;
try {<!-- -->
// binary data
rc = RowChange. parseFrom(entry. getStoreValue());
} catch (Exception e) {<!-- -->
throw new RuntimeException("parser error!");
}
EventType eventType = rc. getEventType();
\t\t\t
System.err.println(String.format("binlog[%s:%s], name[%s,%s], eventType : %s",
entry. getHeader(). getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry. getHeader(). getTableName(),
eventType));
\t\t\t
// actually process the data
for(RowData rd : rc.getRowDatasList()) {<!-- -->
if(eventType == EventType. DELETE) {<!-- -->
// delete operation BeforeColumnsList
List<Column> deleteList = rd. getBeforeColumnsList();
printColumn(deleteList, "before deletion");
\t\t\t\t\t
} else if(eventType == EventType. INSERT) {<!-- -->
// insert operation AfterColumnsList
List<Column> insertList = rd. getAfterColumnsList();
printColumn(insertList, "after adding");
}
// update
else {<!-- -->
List<Column> updateBeforeList = rd. getBeforeColumnsList();
printColumn(updateBeforeList, "before modification");
List<Column> updateAfterList = rd. getAfterColumnsList();
printColumn(updateAfterList, "after modification");
}
}
}
}
\t
private static void printColumn(List<Column> columns, String operationMsg) {<!-- -->
        System.err.println("Number of CanalEntry.Column:" + list.size());
for(Column column: columns) {<!-- -->
System.err.println("Operation type: " + operationMsg + "--"
+ column. getName()
+ " : "
+ column. getValue()
+ ", update = "
+ column. getUpdated());
}
}
}

3. Test

After starting the SpringBoot project, modify the table data in the database on the MariaDB master node, and then observe the output of the java console. Here, the java code is only printed out after obtaining the data. If necessary, you can send the data to MQ or MySQL by yourself. /MariaDB, or Redis.

MariaDB master node adds table data in the database:

Output from the java console:

4. Combining Canal and Kafka to achieve data synchronization

The operation of the combination of java and canal in the third step is single-threaded, the performance is not very good, there is no ability to accumulate messages, and the performance cannot support the concurrent amount, so it is necessary to integrate canal and kafka, and deliver the mysql binlog data to kafka , and then processed by the consumer. The advantages of kafka: good stability, good performance, high throughput, can do traffic peak clipping, and cache data. Using kafka can have message accumulation, cache messages, high performance and high throughput. It has great advantages when dealing with large-scale data.

When MariaDB’s binlog changes, canal will parse the binlog log, send the parsed data to Kafka, and then write consumer-side code to process Kafka’s messages. For example, it can be output to ES.

1. To integrate Canal with Kafka, you need to configure canal

(1) Modify canal.properties

vim /usr/local/canal/conf/canal.properties

The main revisions are:

canal.serverMode = kafka # Select the push mode of kafka, send kafka messages, the default is tcp
kafka.bootstrap.servers=192.168.31.101:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 100
kafka.max.request.size=1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 3

(2) Modify instance.properties

vim /usr/local/canal/conf/example/instance.properties

The main revisions are as follows:

# table regex This is a more important parameter, matching the white list of the library, for example, if I only need the incremental data of the user table of the test library, then write test.user
canal.instance.filter.regex=.*\..*

####### mq config ######
# canal.mq.topic=example
# Dynamic topic, generate dynamic topic route by schema or table regex according to library name and table name
canal.mq.dynamicTopic=.*\..*
# table black regex
canal.instance.filter.black.regex=

#canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
# When creating a topic according to kafka, the number of partitions generated by default is set, which is generally less than or equal to the number of partitions of kafka. My kafka is set to 5, so I also set 5 here
canal.mq.partitionsNum=5
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
# .*\..*:$pk$ Regular matching, specifying the hash field corresponding to all regular matching tables as the primary key of the table (automatic lookup)
canal.mq.partitionHash=.*\..*:$pk$

2. Write kafka consumer code

Canal listens to the MariaDB master node. When there is a data change, that is, when the binlog is updated, it will send a message to Kafka. Then we now write a consumer to monitor the specified topic, obtain the Kafka message, and consume it.

public class CollectKafkaConsumer {<!-- -->

private final KafkaConsumer<String, String> consumer;
\t
private final String topic;
\t
public CollectKafkaConsumer(String topic) {<!-- -->
Properties props = new Properties();
// link kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//"latest"
//latest,earliest
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
this. topic = topic;
// Subscribe to the topic
consumer. subscribe(Collections. singletonList(topic));
}
\t
    private void receive(KafkaConsumer<String, String> consumer) {<!-- -->
        while (true) {<!-- -->
            // pull result set
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            
            for (TopicPartition partition : records. partitions()) {<!-- -->
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                String topic = partition. topic();
                int size = partitionRecords. size();
                // Get topic: test-db.demo, partition location: 2, number of messages: 1
                System.err.println("Get topic: " + topic + ", partition location: " + partition.partition() + ", number of messages:" + size);
            
                for (int i = 0; i< size; i ++ ) {<!-- -->
                /**
                * {
                * ----> "data":[{"id":"010","name":"z100","age":"35"} ],
                * ----> "database":"test-db",
                * "es":1605269364000,
                * ----> "id":2,
                * "isDdl": false,
                * ----> "mysqlType":{"id":"varchar(32)","name":"varchar(40)","age":\ "int(8)"},
                * ----> "old":[{"name":"z10","age":"32"}],
                * ----> "pkNames":["id"],
                * "sql":"",
                * "sqlType":{"id":12,"name":12,"age":4},
                * ----> "table":"demo",
                * ----> "ts":1605269365135,
                * ----> "type":"UPDATE"}
                */
                System.err.println("-----> value: " + partitionRecords.get(i).value());
                    long offset = partitionRecords.get(i).offset() + 1;
// consumer.commitSync();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
                    System.err.println("Successful synchronization, topic: " + topic + ", submitted offset: " + offset);
                }
                //
                //System.err.println("msgList: " + msgList);
            }
        }
    }

public static void main(String[] args) {<!-- -->
String topic = "didiok_users";
CollectKafkaConsumer collectKafkaConsumer = new CollectKafkaConsumer(topic);
collectKafkaConsumer.receive(collectKafkaConsumer.consumer);
}
}

Then start the springboot project, and then add data in the table users of the database didiok on the 192.168.31.102 node for testing. The java console will output the following:

WeChat official account name: Java Zhizhi

WeChat public account id: JavaZhiZhe

Welcome to follow, thank you!