Centos7 deploys Canal and integrates it with Canal

1. Introduction

canal [k?’n?l], translated as waterway/pipeline/ditch, its main purpose is to provide incremental data subscription and consumption based on MySQL database incremental log analysis

In the early days of Alibaba, due to the deployment of dual computer rooms in Hangzhou and the United States, there was a business need for cross-computer room synchronization. The implementation method was mainly to obtain incremental changes based on business triggers. Since 2010, businesses have gradually tried to parse database logs to obtain incremental changes for synchronization, which has resulted in a large number of database incremental subscription and consumption businesses.

Businesses based on log incremental subscription and consumption include

  • Database mirroring
  • Database real-time backup
  • Index construction and real-time maintenance (split heterogeneous index, inverted index, etc.)
  • Business cache refresh
  • Incremental data processing with business logic

The current canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x

2. Working principle
2.1, MySQL primary and secondary replication principles
  • MySQL master writes data changes to the binary log (binary log, where the records are called binary log events, which can be viewed through show binlog events)
  • MySQL slave copies the master’s binary log events to its relay log (relay log)
  • MySQL slave replays the events in the relay log and reflects the data changes to its own data
2.2, working principle of canal
  • canal simulates the interaction protocol of MySQL slave, disguises itself as MySQL slave, and sends the dump protocol to MySQL master.
  • MySQL master receives the dump request and starts pushing the binary log to the slave (i.e. canal)
  • canal parses the binary log object (originally a byte stream)
3. Deployment
3.1. Preparation

For self-built MySQL, you need to enable the Binlog writing function first and configure binlog-format to ROW mode. The configuration in my.cnf is as follows

[mysqld]
log-bin=mysql-bin # Enable binlog
binlog-format=ROW # Select ROW mode
server_id=1 # Configuring MySQL replica needs to be defined, do not duplicate the slaveId of canal
  • Note: For Alibaba Cloud RDS for MySQL, binlog is turned on by default, and the account has binlog dump permissions by default. No permissions or binlog settings are required. You can skip this step directly.

Authorize canal link MySQL account has the permission to serve as MySQL slave. If you already have an account, you can directly grant it.

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.2, Startup

Download canal, visit the release page, select the required package to download, take version 1.0.17 as an example

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz

unzip

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal

After decompression is completed, enter the /tmp/canal directory and you can see the following structure

drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs

Configuration modification

vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info needs to be changed to your own database information
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password needs to be changed to your own database information
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#tableregex
canal.instance.filter.regex = .\*\\..\*
canal.instance.connectionCharset represents the encoding method of the database corresponding to the encoding type in java, such as UTF-8, GBK, ISO-8859-1

If the system has 1 cpu, you need to set canal.instance.parser.parallel to false

start up

sh bin/startup.sh

View server logs

vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

View instance logs

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

closure

sh bin/stop.sh
4. Install canal-admin

Canal-admin is designed to provide overall configuration management, node operation and maintenance and other operation-oriented functions for canal. It also provides a relatively friendly WebUI operation interface to facilitate more users to operate quickly and safely.

4.1. Preparation

Canal-admin’s limited dependencies:

MySQL, used to store related data such as configuration and nodes
canal version, requirement >=1.1.4 (need to rely on canal-server to provide dynamic operation and maintenance management interface for admin)

4.2, Deployment

Download canal-admin, visit the release page, select the required package to download, take version 1.1.4 as an example

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

unzip

mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin

After decompression is completed, enter the /tmp/canal directory and you can see the following structure

drwxr-xr-x 6 agapple staff 204B 8 31 15:37 bin
drwxr-xr-x 8 agapple staff 272B 8 31 15:37 conf
drwxr-xr-x 90 agapple staff 3.0K 8 31 15:37 lib
drwxr-xr-x 2 agapple staff 68B 8 31 15:26 logs

Configuration modification

vi conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT + 8

spring.datasource:
  address: 127.0.0.1:3306
  database:canal_manager
  username: canal
  password:canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true &characterEncoding=UTF-8 &useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

Canal:
  adminUser: admin
  adminPasswd:admin

Initialize metabase

mysql -h127.1 -uroot -p

Import initialization SQL

> source conf/canal_manager.sql

a. The canal_manager database will be created by default in the initialization SQL script. It is recommended to use an account with super privileges such as root for initialization. b. canal_manager.sql will be in the conf directory by default. You can also download canal_manager.sql through the link.

start up

sh bin/startup.sh

View admin log

vi logs/admin.log
2019-08-31 15:43:38.162 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2019-08-31 15:43:38.180 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2019-08-31 15:43:38.191 [main] INFO org.apache.catalina.core.StandardService - Starting service [Tomcat]
2019-08-31 15:43:38.194 [main] INFO org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2019-08-31 15:43:39.789 [main] INFO o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2019-08-31 15:43:39.825 [main] INFO o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]

This means that canal-admin has been started successfully and can be accessed through http://127.0.0.1:8089/. The default password is: admin/123456

closure

sh bin/stop.sh

canal-server configuration
Override canal.properties using the configuration of canal_local.properties

# register ip
canal.register.ip =

#canaladminconfig
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user=admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

Just start admin-server.

Or use parameters in the startup command: sh bin/startup.sh local to specify the configuration

5. Integrated use of springboot project

Dependency configuration:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
5.1. Create mvn standard project:
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample

Maven3.0.5 or above abandons create and uses generate to generate projects.

mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample
5.2. Modify pom.xml and add dependencies
5.3, ClientSample code
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {<!-- -->


public static void main(String args[]) {<!-- -->
    //Create link
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {<!-- -->
        connector.connect();
        connector.subscribe(".*\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {<!-- -->
            Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {<!-- -->
                emptyCount + + ;
                System.out.println("empty count : " + emptyCount);
                try {<!-- -->
                    Thread.sleep(1000);
                } catch (InterruptedException e) {<!-- -->
                }
            } else {<!-- -->
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \\
", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // Submit confirmation
            // connector.rollback(batchId); // Processing failed, rollback data
        }

        System.out.println("empty too many times, exit");
    } finally {<!-- -->
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entries) {<!-- -->
    for (Entry entry : entries) {<!-- -->
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {<!-- -->
            continue;
        }

        RowChange rowChage = null;
        try {<!-- -->
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {<!-- -->
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================ & amp;gt; binlog[%s:%s] , name[%s,%s] , eventType: %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {<!-- -->
            if (eventType == EventType.DELETE) {<!-- -->
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {<!-- -->
                printColumn(rowData.getAfterColumnsList());
            } else {<!-- -->
                System.out.println("------- & amp;gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("------- & amp;gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {<!-- -->
    for (Column column : columns) {<!-- -->
        System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    }
}

    
  

}
5.4, Run Client

First start Canal Server,

After starting Canal Client, you can see a similar message from the console:

empty count : 1
empty count: 2
empty count: 3
empty count: 4

This means that there is no changed data in the current database.

  1. Trigger database changes
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
    -> `ID` int(11) NOT NULL AUTO_INCREMENT,
    -> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    -> PRIMARY KEY (`ID`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

This can be seen from the console:

empty count : 1
empty count: 2
empty count: 3
empty count: 4
================> binlog[mysql-bin.001946:313661577], name[test,xdual], eventType: INSERT
ID : 4 update=true
X : 2013-02-05 23:29:46 update=true