1. Introduction
canal [k?’n?l], translated as waterway/pipeline/ditch, its main purpose is to provide incremental data subscription and consumption based on MySQL database incremental log analysis
In the early days of Alibaba, due to the deployment of dual computer rooms in Hangzhou and the United States, there was a business need for cross-computer room synchronization. The implementation method was mainly to obtain incremental changes based on business triggers. Since 2010, businesses have gradually tried to parse database logs to obtain incremental changes for synchronization, which has resulted in a large number of database incremental subscription and consumption businesses.
Businesses based on log incremental subscription and consumption include
- Database mirroring
- Database real-time backup
- Index construction and real-time maintenance (split heterogeneous index, inverted index, etc.)
- Business cache refresh
- Incremental data processing with business logic
The current canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x
2. Working principle
2.1, MySQL primary and secondary replication principles
- MySQL master writes data changes to the binary log (binary log, where the records are called binary log events, which can be viewed through show binlog events)
- MySQL slave copies the master’s binary log events to its relay log (relay log)
- MySQL slave replays the events in the relay log and reflects the data changes to its own data
2.2, working principle of canal
- canal simulates the interaction protocol of MySQL slave, disguises itself as MySQL slave, and sends the dump protocol to MySQL master.
- MySQL master receives the dump request and starts pushing the binary log to the slave (i.e. canal)
- canal parses the binary log object (originally a byte stream)
3. Deployment
3.1. Preparation
For self-built MySQL, you need to enable the Binlog writing function first and configure binlog-format to ROW mode. The configuration in my.cnf is as follows
[mysqld] log-bin=mysql-bin # Enable binlog binlog-format=ROW # Select ROW mode server_id=1 # Configuring MySQL replica needs to be defined, do not duplicate the slaveId of canal
- Note: For Alibaba Cloud RDS for MySQL, binlog is turned on by default, and the account has binlog dump permissions by default. No permissions or binlog settings are required. You can skip this step directly.
Authorize canal link MySQL account has the permission to serve as MySQL slave. If you already have an account, you can directly grant it.
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
3.2, Startup
Download canal, visit the release page, select the required package to download, take version 1.0.17 as an example
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
unzip
mkdir /tmp/canal tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
After decompression is completed, enter the /tmp/canal directory and you can see the following structure
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
Configuration modification
vi conf/example/instance.properties
## mysql serverId canal.instance.mysql.slaveId = 1234 #position info needs to be changed to your own database information canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password needs to be changed to your own database information canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #tableregex canal.instance.filter.regex = .\*\\..\* canal.instance.connectionCharset represents the encoding method of the database corresponding to the encoding type in java, such as UTF-8, GBK, ISO-8859-1
If the system has 1 cpu, you need to set canal.instance.parser.parallel to false
start up
sh bin/startup.sh
View server logs
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
View instance logs
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
closure
sh bin/stop.sh
4. Install canal-admin
Canal-admin is designed to provide overall configuration management, node operation and maintenance and other operation-oriented functions for canal. It also provides a relatively friendly WebUI operation interface to facilitate more users to operate quickly and safely.
4.1. Preparation
Canal-admin’s limited dependencies:
MySQL, used to store related data such as configuration and nodes
canal version, requirement >=1.1.4 (need to rely on canal-server to provide dynamic operation and maintenance management interface for admin)
4.2, Deployment
Download canal-admin, visit the release page, select the required package to download, take version 1.1.4 as an example
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
unzip
mkdir /tmp/canal-admin tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin
After decompression is completed, enter the /tmp/canal directory and you can see the following structure
drwxr-xr-x 6 agapple staff 204B 8 31 15:37 bin drwxr-xr-x 8 agapple staff 272B 8 31 15:37 conf drwxr-xr-x 90 agapple staff 3.0K 8 31 15:37 lib drwxr-xr-x 2 agapple staff 68B 8 31 15:26 logs
Configuration modification
vi conf/application.yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT + 8 spring.datasource: address: 127.0.0.1:3306 database:canal_manager username: canal password:canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true &characterEncoding=UTF-8 &useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 Canal: adminUser: admin adminPasswd:admin
Initialize metabase
mysql -h127.1 -uroot -p
Import initialization SQL
> source conf/canal_manager.sql
a. The canal_manager database will be created by default in the initialization SQL script. It is recommended to use an account with super privileges such as root for initialization. b. canal_manager.sql will be in the conf directory by default. You can also download canal_manager.sql through the link.
start up
sh bin/startup.sh
View admin log
vi logs/admin.log
2019-08-31 15:43:38.162 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http) 2019-08-31 15:43:38.180 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"] 2019-08-31 15:43:38.191 [main] INFO org.apache.catalina.core.StandardService - Starting service [Tomcat] 2019-08-31 15:43:38.194 [main] INFO org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29 .... 2019-08-31 15:43:39.789 [main] INFO o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler 2019-08-31 15:43:39.825 [main] INFO o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]
This means that canal-admin has been started successfully and can be accessed through http://127.0.0.1:8089/. The default password is: admin/123456
closure
sh bin/stop.sh
canal-server configuration
Override canal.properties using the configuration of canal_local.properties
# register ip canal.register.ip = #canaladminconfig canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user=admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =
Just start admin-server.
Or use parameters in the startup command: sh bin/startup.sh local
to specify the configuration
5. Integrated use of springboot project
Dependency configuration:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
5.1. Create mvn standard project:
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
Maven3.0.5 or above abandons create and uses generate to generate projects.
mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample
5.2. Modify pom.xml and add dependencies
5.3, ClientSample code
package com.alibaba.otter.canal.sample; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; public class SimpleCanalClientExample {<!-- --> public static void main(String args[]) {<!-- --> //Create link CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try {<!-- --> connector.connect(); connector.subscribe(".*\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) {<!-- --> Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) {<!-- --> emptyCount + + ; System.out.println("empty count : " + emptyCount); try {<!-- --> Thread.sleep(1000); } catch (InterruptedException e) {<!-- --> } } else {<!-- --> emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \\ ", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // Submit confirmation // connector.rollback(batchId); // Processing failed, rollback data } System.out.println("empty too many times, exit"); } finally {<!-- --> connector.disconnect(); } } private static void printEntry(List<Entry> entries) {<!-- --> for (Entry entry : entries) {<!-- --> if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {<!-- --> continue; } RowChange rowChage = null; try {<!-- --> rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) {<!-- --> throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================ & amp;gt; binlog[%s:%s] , name[%s,%s] , eventType: %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) {<!-- --> if (eventType == EventType.DELETE) {<!-- --> printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) {<!-- --> printColumn(rowData.getAfterColumnsList()); } else {<!-- --> System.out.println("------- & amp;gt; before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("------- & amp;gt; after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) {<!-- --> for (Column column : columns) {<!-- --> System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
5.4, Run Client
First start Canal Server,
After starting Canal Client, you can see a similar message from the console:
empty count : 1 empty count: 2 empty count: 3 empty count: 4
This means that there is no changed data in the current database.
- Trigger database changes
mysql> use test; Database changed mysql> CREATE TABLE `xdual` ( -> `ID` int(11) NOT NULL AUTO_INCREMENT, -> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -> PRIMARY KEY (`ID`) -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8; Query OK, 0 rows affected (0.06 sec) mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
This can be seen from the console:
empty count : 1 empty count: 2 empty count: 3 empty count: 4 ================> binlog[mysql-bin.001946:313661577], name[test,xdual], eventType: INSERT ID : 4 update=true X : 2013-02-05 23:29:46 update=true