Data synchronization between Canal, MariaDB and kafka
1. Architecture
1. Server node
service name | IP | Role |
---|---|---|
MariaDB | 192.168.31.102 | Master node |
Canal | 192.168 .31.101 | From node |
2.canal introduction
canal open source address:
canal [k?’n?l], translated as waterway/pipe/ditch, the main purpose is to provide incremental data subscription and consumption services based on MySQL database incremental log analysis, including:
-
Database mirroring, database real-time backup
-
Index construction and real-time maintenance (split heterogeneous index, inverted index, etc.)
-
Business cache refresh, incremental data processing with business logic
(1) Working principle
- canal simulates the interactive protocol of MySQL slave, pretends to be MySQL slave, and sends dump protocol to MySQL master
- MySQL master receives dump request and starts to push binary log to slave (ie canal)
- canal parse binary log object (originally byte stream)
(2) Advantages and disadvantages
Advantages: Good real-time performance, distributed, ACK mechanism
Cons:
-
Only incremental synchronization is supported, full synchronization is not supported
-
MySQl–>ES, RDB, supported data sources are limited
-
An instance can only be consumed by one consumer
-
single point pressure
2. MariaDB master node
1. MariaDB installation
Tutorial: Install MariaDB-10.8 on CentOS-7
2. Configuration file /etc/my.cnf
For MariaDB Primary Node , you need to enable the Binlog write function first, configure binlog-format to ROW mode, and configure it in my.cnf as follows:
[mysqld] log-bin=didiok-mysql-bin # open binlog binlog-format=ROW # select ROW mode server_id=102 # Guaranteed to be unique and cannot be duplicated with the slaveId in canal
3. Restart the service
restart:
systemctl restart mariadb mysql -uroot -proot
Check the open status of log_bin:
mariadb> show variables like '%log_bin%';
Storage location of binlog logs: /var/lib/mysql
4. Create an account for backup on the MySQL master node
After restarting MariaDB, enter MariaDB and create a new account for data synchronization:
#A user named canal is created at this time, the password is 123456, and % means that any address can be used for remote login. mariadb> CREATE USER 'canal'@'%' IDENTIFIED BY '123456'; #Give the canal user authorization for synchronous replication, etc. (REPLICATION SLAVE) mariadb> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; mariadb> flush privileges; -- Authorize the canal link MySQL account has the permission to be a MySQL slave, if you already have an account, you can directly grant -- CREATE USER canal IDENTIFIED BY 'canal'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' WITH GRANT OPTION; -- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
3. Canal slave node
1. Download
Download address: Canal download
There are four installation packages as follows, only download canal.deployer-1.1.7-SNAPSHOT.tar.gz here
-
canal.adapter-1.1.7-SNAPSHOT.tar.gz: Synchronized adapter. The client equivalent to canal will obtain data from canal-server (need to be configured in tcp mode), and then synchronize the data, which can be synchronized to storage such as MySQL, Elasticsearch and HBase. Compared with the canal.serverMode that comes with canal-server, the downstream data provided by canal-adapter is more widely accepted.
-
canal.admin-1.1.7-SNAPSHOT.tar.gz: admin console. Provide canal with overall configuration management, node operation and maintenance and other operation and maintenance-oriented functions, and provide a relatively friendly WebUI operation interface, which is convenient for more users to operate quickly and safely.
-
canal.deployer-1.1.7-SNAPSHOT.tar.gz : canal service. You can directly monitor MySQL’s binlog, disguise yourself as a MySQL slave library, and only receive data without processing it. After receiving the binlog data of MySQL, it can be sent to the corresponding downstream by configuring canal.serverMode: tcp, kafka, rocketMQ, rabbitMQ connection. Among them, the tcp method can customize the canal client to accept data, which is more flexible.
-
canal.example-1.1.7-SNAPSHOT.tar.gz: canal example
2. Install canal
Create folder and extract canal
mkdir /usr/local/canal tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C /usr/local/canal/
3. Modify the configuration file
(1) Modify canal.properties
vim /usr/local/canal/conf/canal.properties
The main revisions are:
# java program connection port canal.port = 11111
In addition, if the system is 1 cpu, you need to set canal.instance.parser.parallel to false
(2) Modify instance.properties
vim /usr/local/canal/conf/example/instance.properties
The main revisions are as follows:
## cannot be duplicated with the existing mariadb node server-id canal.instance.mysql.slaveId=101 ## address of mariadb master canal.instance.master.address=192.168.31.102:3306 ## Specify the user password to connect to mariadb canal.instance.dbUsername=canal canal.instance.dbPassword=123456 ## character set canal.instance.connectionCharset = UTF-8
4. Start canal
cd /usr/local/canal/bin ./startup.sh # Check the running status jps ## The shutdown command is ./stop.sh
5. Authentication Service
Whether the startup is successful, we can check the log data
## View server logs cat /usr/local/canal/logs/canal/canal.log ## Or use the following command to view the log of the instance tail -f -n 100 /usr/local/canal/logs/example/example.log
If you use tail -f -n 100 /usr/local/canal/logs/example/example.log
, you will see the following error:
2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR com.taobao.tddl.dbsync.binlog.LogEvent - Query_log_event has unknown status vars (f first has code: 129), skipping the rest of them 2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR com.taobao.tddl.dbsync.binlog.LogEvent - Query_log_event has unknown status vars (first has code: 23 ), skipping the rest of them 2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] WARN com.taobao.tddl.dbsync.binlog.LogDecoder - Decoding Query failed from: didiok-mysql-bin. 000001:1469 java.io.IOException: Read Q_FLAGS2_CODE error: limit exceeded: 67 at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:715) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.<init>(QueryLogEvent.java:495) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:168) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:111) ~[canal.parse.dbsync-1.1.5.jar:na] at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:179) [canal.parse-1.1.5.jar:na] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:276) [canal.parse-1.1.5.jar:na] at java.lang.Thread.run(Thread.java:750) [na:1.8.0_361] Caused by: java.lang.IllegalArgumentException: limit exceed: 67 at com.taobao.tddl.dbsync.binlog.LogBuffer.getUint32(LogBuffer.java:562) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:612) ~[canal.parse.dbsync-1.1.5.jar:na] ... 6 common frames omitted 2023-05-22 20:57:43.474 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /192.168.3 1.102:3306 has an error, retrying. caused by java.io.IOException: Read Q_FLAGS2_CODE error: limit exceeded: 67 at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:715) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.<init>(QueryLogEvent.java:495) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:168) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:111) ~[canal.parse.dbsync-1.1.5.jar:na] at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:179) ~[canal.parse-1.1.5.jar:na] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:276) ~[canal.parse-1.1.5.jar:na] at java.lang.Thread.run(Thread.java:750) [na:1.8.0_361] Caused by: java.lang.IllegalArgumentException: limit exceed: 67 at com.taobao.tddl.dbsync.binlog.LogBuffer.getUint32(LogBuffer.java:562) ~[canal.parse.dbsync-1.1.5.jar:na] at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:612) ~[canal.parse.dbsync-1.1.5.jar:na] ... 6 common frames omitted 2023-05-22 20:57:43.475 [destination = example , address = /192.168.31.102:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[java.io.IOException : Read Q_FLAGS2_CODE error: limit exceed: 67 at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:715) at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.<init>(QueryLogEvent.java:495) at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:168) at com.taobao.tddl.dbsync.binlog.LogDecoder.decode(LogDecoder.java:111) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:179) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:276) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalArgumentException: limit exceed: 67 at com.taobao.tddl.dbsync.binlog.LogBuffer.getUint32(LogBuffer.java:562) at com.taobao.tddl.dbsync.binlog.event.QueryLogEvent.unpackVariables(QueryLogEvent.java:612) ... 6 more
Workaround:
Use the latest version of canal, the new version of canal has fixed this problem, I installed canal.deployer-1.1.7-SNAPSHOT.tar.gz
and this problem did not occur!
4. Combine SpringBoot and Canal to achieve data synchronization
1. Add dependency in SpringBoot project
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.5</version> </dependency>
2. Write java class:
public class CanalTest {<!-- --> /** * 1. Connect to the Canal server * 2. Request the dump protocol from the Master * 3. Parse the sent binlog * 4. Finally, do the actual operation processing...send to MQ Print... * @param args */ public static void main(String[] args) {<!-- --> \t\t CanalConnector connector = CanalConnectors. newSingleConnector( new InetSocketAddress("192.168.31.101", 11111), // 192.168.31.101 is the ip of the node where the canal is located, and 11111 is the port number of the java connection to the canal "example", "canal", "123456"); \t\t int batchSize = 1000; // pull data volume int emptyCount = 0; try {<!-- --> // connect to our canal server connector. connect(); // What content to subscribe to? What library table content? ? connector.subscribe(".*\..*"); // indicates to subscribe to all libraries and tables // If there is a problem, perform a rollback operation directly connector. rollback(); \t\t\t int totalEmptyCount = 1200; \t\t\t while(emptyCount < totalEmptyCount) {<!-- --> Message message = connector.getWithoutAck(batchSize); // Pull 1000 pieces of data at a time and encapsulate them into a message // batchId is used for ACK submission after processing the data long batchId = message. getId(); int size = message. getEntries(). size(); if(batchId == -1 || size == 0) {<!-- --> // no data fetched emptyCount++; System.err.println("empty count: " + emptyCount); try {<!-- --> Thread. sleep(1000); } catch (InterruptedException e) {<!-- --> // ignore.. } } else {<!-- --> // have data emptyCount = 0; System.err.printf("message[batchId=%s, size=%s] \\ ", batchId, size); // process parsed data printEnrty(message. getEntries()); } // Confirm the submitted processed data connector.ack(batchId); } System.err.println("empty too many times, exit"); \t\t\t } finally {<!-- --> // close the connection connector. disconnect(); } \t\t } private static void printEnrty(List<Entry> entries) {<!-- --> for(Entry entry : entries) {<!-- --> System.err.println("entry.getEntryType():" + entry.getEntryType()); // If EntryType is currently in the process of a transaction, it cannot be processed if(entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {<!-- --> continue; } \t\t\t // rc contains a lot of information: storage database, table, binlog RowChange rc = null; try {<!-- --> // binary data rc = RowChange. parseFrom(entry. getStoreValue()); } catch (Exception e) {<!-- --> throw new RuntimeException("parser error!"); } EventType eventType = rc. getEventType(); \t\t\t System.err.println(String.format("binlog[%s:%s], name[%s,%s], eventType : %s", entry. getHeader(). getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry. getHeader(). getTableName(), eventType)); \t\t\t // actually process the data for(RowData rd : rc.getRowDatasList()) {<!-- --> if(eventType == EventType. DELETE) {<!-- --> // delete operation BeforeColumnsList List<Column> deleteList = rd. getBeforeColumnsList(); printColumn(deleteList, "before deletion"); \t\t\t\t\t } else if(eventType == EventType. INSERT) {<!-- --> // insert operation AfterColumnsList List<Column> insertList = rd. getAfterColumnsList(); printColumn(insertList, "after adding"); } // update else {<!-- --> List<Column> updateBeforeList = rd. getBeforeColumnsList(); printColumn(updateBeforeList, "before modification"); List<Column> updateAfterList = rd. getAfterColumnsList(); printColumn(updateAfterList, "after modification"); } } } } \t private static void printColumn(List<Column> columns, String operationMsg) {<!-- --> System.err.println("Number of CanalEntry.Column:" + list.size()); for(Column column: columns) {<!-- --> System.err.println("Operation type: " + operationMsg + "--" + column. getName() + " : " + column. getValue() + ", update = " + column. getUpdated()); } } }
3. Test
After starting the SpringBoot project, modify the table data in the database on the MariaDB master node, and then observe the output of the java console. Here, the java code is only printed out after obtaining the data. If necessary, you can send the data to MQ or MySQL by yourself. /MariaDB, or Redis.
MariaDB master node adds table data in the database:
Output from the java console:
4. Combining Canal and Kafka to achieve data synchronization
The operation of the combination of java and canal in the third step is single-threaded, the performance is not very good, there is no ability to accumulate messages, and the performance cannot support the concurrent amount, so it is necessary to integrate canal and kafka, and deliver the mysql binlog data to kafka , and then processed by the consumer. The advantages of kafka: good stability, good performance, high throughput, can do traffic peak clipping, and cache data. Using kafka can have message accumulation, cache messages, high performance and high throughput. It has great advantages when dealing with large-scale data.
When MariaDB’s binlog changes, canal will parse the binlog log, send the parsed data to Kafka, and then write consumer-side code to process Kafka’s messages. For example, it can be output to ES.
1. To integrate Canal with Kafka, you need to configure canal
(1) Modify canal.properties
vim /usr/local/canal/conf/canal.properties
The main revisions are:
canal.serverMode = kafka # Select the push mode of kafka, send kafka messages, the default is tcp kafka.bootstrap.servers=192.168.31.101:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 100 kafka.max.request.size=1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 3
(2) Modify instance.properties
vim /usr/local/canal/conf/example/instance.properties
The main revisions are as follows:
# table regex This is a more important parameter, matching the white list of the library, for example, if I only need the incremental data of the user table of the test library, then write test.user canal.instance.filter.regex=.*\..* ####### mq config ###### # canal.mq.topic=example # Dynamic topic, generate dynamic topic route by schema or table regex according to library name and table name canal.mq.dynamicTopic=.*\..* # table black regex canal.instance.filter.black.regex= #canal.mq.partition=0 # hash partition config #canal.mq.enableDynamicQueuePartition=false # When creating a topic according to kafka, the number of partitions generated by default is set, which is generally less than or equal to the number of partitions of kafka. My kafka is set to 5, so I also set 5 here canal.mq.partitionsNum=5 #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 # .*\..*:$pk$ Regular matching, specifying the hash field corresponding to all regular matching tables as the primary key of the table (automatic lookup) canal.mq.partitionHash=.*\..*:$pk$
2. Write kafka consumer code
Canal listens to the MariaDB master node. When there is a data change, that is, when the binlog is updated, it will send a message to Kafka. Then we now write a consumer to monitor the specified topic, obtain the Kafka message, and consume it.
public class CollectKafkaConsumer {<!-- --> private final KafkaConsumer<String, String> consumer; \t private final String topic; \t public CollectKafkaConsumer(String topic) {<!-- --> Properties props = new Properties(); // link kafka cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group-id"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//"latest" //latest,earliest props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); this. topic = topic; // Subscribe to the topic consumer. subscribe(Collections. singletonList(topic)); } \t private void receive(KafkaConsumer<String, String> consumer) {<!-- --> while (true) {<!-- --> // pull result set ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (TopicPartition partition : records. partitions()) {<!-- --> List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); String topic = partition. topic(); int size = partitionRecords. size(); // Get topic: test-db.demo, partition location: 2, number of messages: 1 System.err.println("Get topic: " + topic + ", partition location: " + partition.partition() + ", number of messages:" + size); for (int i = 0; i< size; i ++ ) {<!-- --> /** * { * ----> "data":[{"id":"010","name":"z100","age":"35"} ], * ----> "database":"test-db", * "es":1605269364000, * ----> "id":2, * "isDdl": false, * ----> "mysqlType":{"id":"varchar(32)","name":"varchar(40)","age":\ "int(8)"}, * ----> "old":[{"name":"z10","age":"32"}], * ----> "pkNames":["id"], * "sql":"", * "sqlType":{"id":12,"name":12,"age":4}, * ----> "table":"demo", * ----> "ts":1605269365135, * ----> "type":"UPDATE"} */ System.err.println("-----> value: " + partitionRecords.get(i).value()); long offset = partitionRecords.get(i).offset() + 1; // consumer.commitSync(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset))); System.err.println("Successful synchronization, topic: " + topic + ", submitted offset: " + offset); } // //System.err.println("msgList: " + msgList); } } } public static void main(String[] args) {<!-- --> String topic = "didiok_users"; CollectKafkaConsumer collectKafkaConsumer = new CollectKafkaConsumer(topic); collectKafkaConsumer.receive(collectKafkaConsumer.consumer); } }
Then start the springboot project, and then add data in the table users of the database didiok on the 192.168.31.102 node for testing. The java console will output the following: