canal
canal [k?’n?l], translated as waterway/pipeline/ditch, its main purpose is to provide incremental data subscription and consumption based on MySQL database incremental log analysis
In the early days of Alibaba, due to the deployment of dual computer rooms in Hangzhou and the United States, there was a business need for cross-computer room synchronization. The implementation method was mainly to obtain incremental changes based on business triggers. Since 2010, businesses have gradually tried to parse database logs to obtain incremental changes for synchronization, which has resulted in a large number of database incremental subscription and consumption businesses.
Businesses based on log incremental subscription and consumption include
Database mirroring Database real-time backup Index construction and real-time maintenance (split heterogeneous index, inverted index, etc.) Business cache refresh Incremental data processing with business logic Current
canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x
How it works
-
MySQL master writes data changes to the binary log (binary log, where the records are called binary log events)
events, which can be viewed via show binlog events) -
MySQL slave will master’s binary
Log events are copied to its relay log (relay log) MySQL slave replays relay log -
MySQL slave replays the events in the relay log and reflects the data changes to its own data
canal working principle -
MySQL slave replays the events in the relay log and reflects the data changes to its own data
canal working principle -
MySQL master receives the dump request and starts pushing the binary log to the slave (i.e. canal)
-
canal parses the binary log object (originally a byte stream)
My own application scenario is in the statistical analysis function. I use microservice calls to obtain statistical data. However, this has a high degree of coupling and relatively low efficiency. I now use the Canal database synchronization tool to achieve real-time synchronization of the database. For example, if we want to count the number of people who register and log in every day, we only need to synchronize the membership table to the statistics library to implement local statistics, which will be more efficient and less coupled.
Canal is an open source project under Alibaba, developed purely in Java. Based on database incremental log analysis, incremental data subscription & consumption is provided.
Canal environment construction
The principle of canal is based on mysql binlog technology, so here we need to enable the binlog writing function of mysql
In the Linux system, start the mysql service: systemctl start mysqld or service mysql start
Check whether the binlog function is enabled
Enable binlog function
If the display status is OFF, it means that the function has not been turned on. Turn on the binlog function.
Modify the mysql configuration file my.cnf
vim /etc/my.cnf
Additional content
log-bin=mysql-bin #binlog file name binlog_format=ROW #Select row mode server_id=1 #mysql instance id, cannot be repeated with canal's slaveId
Restart mysql
systemctl restart mysqld
Log in to the mysql client again and view the log_bin variable
Displaying ON indicates that the function is turned on.
Add the following related users and permissions in mysql
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
This is actually adding a user who can remotely access the mysql database. The account and password are both canal. Since my virtual machine has already added the root user, I will not add this canal here. You can use it according to your own situation.
Download and install Canal service
Download canal address
After downloading, put it in the directory and unzip the file
Unzip
tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/
Modify configuration file
vim conf/example/instance.properties
Here is the quotation note: MySQL data parsing focuses on tables, Perl regular expressions. Multiple regular expressions are separated by commas (,), and the escape character requires double slashes ()
1. Common examples: all tables: .* or .\…
2. All tables under canal schema: canal\…*
3. The table starting with canal under canal: canal\.canal.*
4…A table under canal schema: canal.test1
5. Use multiple rules in combination: canal\…*, mysql.test1, mysql.test2 (comma separated) Note: This filter condition is only valid for data in row mode (ps.
6. Because mixed/statement does not parse sql, it cannot accurately extract tableName for filtering)
Enter the bin directory to start
./startup.sh
Code integration creates canal_client module
Introduce related dependencies
Create application.properties configuration file
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>commons-dbutils</groupId> <artifactId>commons-dbutils</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> </dependency> </dependencies>
# Service port server.port=10000 # Service Name spring.application.name=canal-client # Environment settings: dev, test, prod spring.profiles.active=dev # mysql database connection spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/guli?serverTimezone=GMT+8 spring.datasource.username=root spring.datasource.password=123456
Write canal client class
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.dbutils.DbUtils; import org.apache.commons.dbutils.QueryRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.net.InetSocketAddress; import java.sql.Connection; import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @Component public class CanalClient {<!-- --> //sql queue private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); @Resource private DataSource dataSource; /** * canal storage method */ public void run() {<!-- --> CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.159.33", 11111), "example", "", ""); int batchSize = 1000; try {<!-- --> connector.connect(); connector.subscribe(".*\..*"); connector.rollback(); try {<!-- --> while (true) {<!-- --> //Try to pull data batchSize records from the master, take as many as there are Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) {<!-- --> Thread.sleep(1000); } else {<!-- --> dataHandle(message.getEntries()); } connector.ack(batchId); //When the SQL accumulated in the queue is greater than a certain value, simulate execution if (SQL_QUEUE.size() >= 1) {<!-- --> executeQueueSql(); } } } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } catch (InvalidProtocolBufferException e) {<!-- --> e.printStackTrace(); } } finally {<!-- --> connector.disconnect(); } } /** * Simulate sql statements in the execution queue */ public void executeQueueSql() {<!-- --> int size = SQL_QUEUE.size(); for (int i = 0; i < size; i + + ) {<!-- --> String sql = SQL_QUEUE.poll(); System.out.println("[sql]----> " + sql); this.execute(sql.toString()); } } /** * data processing * * @param entries */ private void dataHandle(List<Entry> entries) throws InvalidProtocolBufferException {<!-- --> for (Entry entry : entries) {<!-- --> if (EntryType.ROWDATA == entry.getEntryType()) {<!-- --> RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) {<!-- --> saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) {<!-- --> saveUpdateSql(entry); } else if (eventType == EventType.INSERT) {<!-- --> saveInsertSql(entry); } } } } /** * Save update statement * * @param entry */ private void saveUpdateSql(Entry entry) {<!-- --> try {<!-- --> RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) {<!-- --> List<Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); for (int i = 0; i < newColumnList.size(); i + + ) {<!-- --> sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'"); if (i != newColumnList.size() - 1) {<!-- --> sql.append(","); } } sql.append(" where "); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) {<!-- --> if (column.getIsKey()) {<!-- --> //Only supports a single primary key for the time being sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) {<!-- --> e.printStackTrace(); } } /** * Save delete statement * * @param entry */ private void saveDeleteSql(Entry entry) {<!-- --> try {<!-- --> RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) {<!-- --> List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (Column column : columnList) {<!-- --> if (column.getIsKey()) {<!-- --> //Only supports a single primary key for the time being sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) {<!-- --> e.printStackTrace(); } } /** * Save the insert statement * * @param entry */ private void saveInsertSql(Entry entry) {<!-- --> try {<!-- --> RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) {<!-- --> List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i + + ) {<!-- --> sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) {<!-- --> sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i + + ) {<!-- --> sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) {<!-- --> sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) {<!-- --> e.printStackTrace(); } } /** * Warehouse * @param sql */ public void execute(String sql) {<!-- --> Connection con = null; try {<!-- --> if(null == sql) return; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.execute(con, sql); System.out.println("update: " + row); } catch (SQLException e) {<!-- --> e.printStackTrace(); } finally {<!-- --> DbUtils.closeQuietly(con); } } }
You change the IP address in this place to the one on your own virtual machine or server.
@SpringBootApplication public class CanalApplication implements CommandLineRunner {<!-- --> @Resource private CanalClient canalClient; public static void main(String[] args) {<!-- --> SpringApplication.run(CanalApplication.class, args); } @Override public void run(String... strings) throws Exception {<!-- --> //Start the project and execute canal client monitoring canalClient.run(); } }
Required before testing
Insert a piece of data test in linux system
Look at the local console
Update the following data above in Linux
Look at the local console
Check whether the data in the local windows mysql database table is consistent with the data on linux