Use canal to achieve real-time data synchronization

canal

canal [k?’n?l], translated as waterway/pipeline/ditch, its main purpose is to provide incremental data subscription and consumption based on MySQL database incremental log analysis

In the early days of Alibaba, due to the deployment of dual computer rooms in Hangzhou and the United States, there was a business need for cross-computer room synchronization. The implementation method was mainly to obtain incremental changes based on business triggers. Since 2010, businesses have gradually tried to parse database logs to obtain incremental changes for synchronization, which has resulted in a large number of database incremental subscription and consumption businesses.

Businesses based on log incremental subscription and consumption include

Database mirroring Database real-time backup Index construction and real-time maintenance (split heterogeneous index, inverted index, etc.) Business cache refresh Incremental data processing with business logic Current
canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x

How it works

  • MySQL master writes data changes to the binary log (binary log, where the records are called binary log events)
    events, which can be viewed via show binlog events)

  • MySQL slave will master’s binary
    Log events are copied to its relay log (relay log) MySQL slave replays relay log

  • MySQL slave replays the events in the relay log and reflects the data changes to its own data
    canal working principle

  • MySQL slave replays the events in the relay log and reflects the data changes to its own data
    canal working principle

  • MySQL master receives the dump request and starts pushing the binary log to the slave (i.e. canal)

  • canal parses the binary log object (originally a byte stream)

My own application scenario is in the statistical analysis function. I use microservice calls to obtain statistical data. However, this has a high degree of coupling and relatively low efficiency. I now use the Canal database synchronization tool to achieve real-time synchronization of the database. For example, if we want to count the number of people who register and log in every day, we only need to synchronize the membership table to the statistics library to implement local statistics, which will be more efficient and less coupled.
Canal is an open source project under Alibaba, developed purely in Java. Based on database incremental log analysis, incremental data subscription & consumption is provided.

Canal environment construction
The principle of canal is based on mysql binlog technology, so here we need to enable the binlog writing function of mysql
In the Linux system, start the mysql service: systemctl start mysqld or service mysql start

Check whether the binlog function is enabled

Enable binlog function
If the display status is OFF, it means that the function has not been turned on. Turn on the binlog function.

Modify the mysql configuration file my.cnf

vim /etc/my.cnf

Additional content

log-bin=mysql-bin #binlog file name
binlog_format=ROW #Select row mode
server_id=1 #mysql instance id, cannot be repeated with canal's slaveId

Restart mysql

systemctl restart mysqld

Log in to the mysql client again and view the log_bin variable

Displaying ON indicates that the function is turned on.

Add the following related users and permissions in mysql

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

This is actually adding a user who can remotely access the mysql database. The account and password are both canal. Since my virtual machine has already added the root user, I will not add this canal here. You can use it according to your own situation.

Download and install Canal service
Download canal address

After downloading, put it in the directory and unzip the file

Unzip

tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/


Modify configuration file

vim conf/example/instance.properties

Here is the quotation note: MySQL data parsing focuses on tables, Perl regular expressions. Multiple regular expressions are separated by commas (,), and the escape character requires double slashes ()
1. Common examples: all tables: .* or .\…
2. All tables under canal schema: canal\…*
3. The table starting with canal under canal: canal\.canal.*
4…A table under canal schema: canal.test1
5. Use multiple rules in combination: canal\…*, mysql.test1, mysql.test2 (comma separated) Note: This filter condition is only valid for data in row mode (ps.
6. Because mixed/statement does not parse sql, it cannot accurately extract tableName for filtering)

Enter the bin directory to start

./startup.sh


Code integration creates canal_client module

Introduce related dependencies

Create application.properties configuration file

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--mysql-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>

    <dependency>
        <groupId>commons-dbutils</groupId>
        <artifactId>commons-dbutils</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
    </dependency>
</dependencies>
# Service port
server.port=10000
# Service Name
spring.application.name=canal-client

# Environment settings: dev, test, prod
spring.profiles.active=dev

# mysql database connection
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/guli?serverTimezone=GMT+8
spring.datasource.username=root
spring.datasource.password=123456

Write canal client class

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

@Component
public class CanalClient {<!-- -->

    //sql queue
    private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    @Resource
    private DataSource dataSource;

    /**
     * canal storage method
     */
    public void run() {<!-- -->

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.159.33",
                11111), "example", "", "");
        int batchSize = 1000;
        try {<!-- -->
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();
            try {<!-- -->
                while (true) {<!-- -->
                    //Try to pull data batchSize records from the master, take as many as there are
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {<!-- -->
                        Thread.sleep(1000);
                    } else {<!-- -->
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);

                    //When the SQL accumulated in the queue is greater than a certain value, simulate execution
                    if (SQL_QUEUE.size() >= 1) {<!-- -->
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {<!-- -->
                e.printStackTrace();
            }
        } finally {<!-- -->
            connector.disconnect();
        }
    }

    /**
     * Simulate sql statements in the execution queue
     */
    public void executeQueueSql() {<!-- -->
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i + + ) {<!-- -->
            String sql = SQL_QUEUE.poll();
            System.out.println("[sql]----> " + sql);

            this.execute(sql.toString());
        }
    }

    /**
     * data processing
     *
     * @param entries
     */
    private void dataHandle(List<Entry> entries) throws InvalidProtocolBufferException {<!-- -->
        for (Entry entry : entries) {<!-- -->
            if (EntryType.ROWDATA == entry.getEntryType()) {<!-- -->
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {<!-- -->
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {<!-- -->
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {<!-- -->
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * Save update statement
     *
     * @param entry
     */
    private void saveUpdateSql(Entry entry) {<!-- -->
        try {<!-- -->
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {<!-- -->
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i + + ) {<!-- -->
                    sql.append(" " + newColumnList.get(i).getName()
                             + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {<!-- -->
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {<!-- -->
                    if (column.getIsKey()) {<!-- -->
                        //Only supports a single primary key for the time being
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {<!-- -->
            e.printStackTrace();
        }
    }

    /**
     * Save delete statement
     *
     * @param entry
     */
    private void saveDeleteSql(Entry entry) {<!-- -->
        try {<!-- -->
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {<!-- -->
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {<!-- -->
                    if (column.getIsKey()) {<!-- -->
                        //Only supports a single primary key for the time being
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {<!-- -->
            e.printStackTrace();
        }
    }

    /**
     * Save the insert statement
     *
     * @param entry
     */
    private void saveInsertSql(Entry entry) {<!-- -->
        try {<!-- -->
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {<!-- -->
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i + + ) {<!-- -->
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {<!-- -->
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i + + ) {<!-- -->
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {<!-- -->
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {<!-- -->
            e.printStackTrace();
        }
    }

    /**
     * Warehouse
     * @param sql
     */
    public void execute(String sql) {<!-- -->
        Connection con = null;
        try {<!-- -->
            if(null == sql) return;
            con = dataSource.getConnection();
            QueryRunner qr = new QueryRunner();
            int row = qr.execute(con, sql);
            System.out.println("update: " + row);
        } catch (SQLException e) {<!-- -->
            e.printStackTrace();
        } finally {<!-- -->
            DbUtils.closeQuietly(con);
        }
    }
}

You change the IP address in this place to the one on your own virtual machine or server.

@SpringBootApplication
public class CanalApplication implements CommandLineRunner {<!-- -->
    @Resource
    private CanalClient canalClient;

    public static void main(String[] args) {<!-- -->
        SpringApplication.run(CanalApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {<!-- -->
        //Start the project and execute canal client monitoring
        canalClient.run();
    }
}


Required before testing
Insert a piece of data test in linux system

Look at the local console

Update the following data above in Linux

Look at the local console

Check whether the data in the local windows mysql database table is consistent with the data on linux