Use cdc technology to synchronize data in real time (canal) – monitor local data and synchronize data to the cloud database through MQ

Use cdc technology to synchronize data in real time (canal) – monitor local data and synchronize data to the cloud database through MQ

1. Download the canal package and modify the configuration file conf -> instance.properties under example
  1. canal.instance.master.address=192.168.8.211:3306 Change the connection database path
  2. canal.instance.dbUsername=root user account to connect to the database
    canal.instance.dbPassword=123456 User password for connecting to the database
  3. canal.instance.filter.regex=jeecg-boot\…* Configure all tables of the database that need to be synchronized
2. Start startup.sh under bin and start canal middleware
3. spring-boot integrates canal and starts coding

pom.xml import dependency

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
            <version>1.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.6</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-udf</artifactId>
            <version>0.31.4-public</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

RabbitMq

package com.tiancai.dataHandle;


import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/***
 *@title dataHandleMq
 *@description
 *@author panyj
 *@version
 *@create 2023/10/28
 **/
@Configuration
public class DataHandleMq {

    @Value("${spring.datahandle.host}")
    private String host;

    @Value("${spring.datahandle.port}")
    private int port;

    @Value("${spring.datahandle.username}")
    private String username;

    @Value("${spring.datahandle.password}")
    private String password;

    @Value("${spring.datahandle.virtual-host}")
    private String virtualHost;


    @Bean
    public ConnectionFactory dataHandleConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }


    @Bean
    public RabbitTemplate dataHandleRabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(dataHandleConnectionFactory());
        return rabbitTemplate;
    }

}

package com.tiancai.dataHandle;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/***
 *@title DataHandleService
 *@description
 *@author panyj
 *@version
 *@create 2023/10/28
 **/
@Service
@Slf4j
public class DataHandleService{

    @Resource
    private RabbitTemplate dataHandleRabbitTemplate;

    public void sendMessageToDataHandleQueue(String queue,String message) {
        log.info(queue + " DataHandleService sends message: " + message);
        dataHandleRabbitTemplate.convertAndSend(queue, message);
    }


    @RabbitListener(queues = "ReceiptDataToServer")
    public void receiveMessageFromFirstQueue(String message) {
        System.out.println("Received message from ReceiptDataToServer: " + message);
        JSONArray jsonArray = JSON.parseArray(message);
        DataHandleSql.handData(jsonArray.getJSONObject(0));
    }

}

enumerate

package com.tiancai.enums;

public enum RabbitToDataEnum {


    ReceiptDataToServer("ReceiptDataToServer","Data Synchronization");

    RabbitToDataEnum(String type, String desc){
        this.type = type;
        this.desc = desc;
    }

    public String type;

    public String desc;

}

Convert to json

package com.tiancai.dataHandle;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.aliyun.odps.udf.UDF;
import com.google.protobuf.ByteString;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;

/***
 *@titleMessageToJson
 *@description
 *@author panyj
 *@version
 *@create 2023/10/27
 **/
public class MessageToJson extends UDF {

    public static String evaluevaluateate(CanalEntry.Entry entry ) throws Exception {
        ArrayList<Object> arrayList = new ArrayList<>();
        HashMap<String, Object> map = new HashMap<>();
        HashMap<String, Object> headerMap = new HashMap<>();
        CanalEntry.Header header = entry.getHeader();
        headerMap.put("version", header.getVersion());
        headerMap.put("logfileName", header.getLogfileName());
        headerMap.put("logfileOffset", header.getLogfileName());
        headerMap.put("serverId", header.getLogfileOffset());
        headerMap.put("serverenCode", header.getServerenCode());
        headerMap.put("executeTime", header.getExecuteTime());
        headerMap.put("sourceType", header.getSourceType());
        headerMap.put("schemaName", header.getSchemaName());
        headerMap.put("tableName", header.getTableName());
        headerMap.put("eventLength", header.getEventLength());
        headerMap.put("eventType", header.getEventType());
        headerMap.put("gtid", header.getGtid());

        HashMap<String, Object> storeValueMap = new HashMap<>();
        ByteString storeValue = entry.getStoreValue();
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

        storeValueMap.put("tableId", rowChange.getTableId());
        storeValueMap.put("eventType", rowChange.getEventType());
        storeValueMap.put("isDdl", rowChange.getIsDdl());
        storeValueMap.put("sql", rowChange.getSql());
        storeValueMap.put("ddlSchemaName", rowChange.getDdlSchemaName());

        ArrayList<Object> rowDatasMapList = new ArrayList<>();

        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
// ArrayList<Object> oneRowData = new ArrayList<>();
            HashMap<String, Object> oneRowData = new HashMap<>();
            ArrayList<IdentityHashMap> bl = new ArrayList<>();
            ArrayList<IdentityHashMap> al = new ArrayList<>();

            List<CanalEntry.Column> afterColumns = rowData.getAfterColumnsList();
            List<CanalEntry.Column> beforeColumns = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumns) {
                IdentityHashMap<String, Object> beforeColumn = new IdentityHashMap<>();

                beforeColumn.put("index", column.getIndex());
                beforeColumn.put("sqlType", column.getSqlType());
                beforeColumn.put("name", column.getName());
                beforeColumn.put("isKey", column.getIsKey());
                beforeColumn.put("updated", column.getUpdated());
                beforeColumn.put("isNull", column.getIsNull());
                beforeColumn.put("value", column.getValue());
                beforeColumn.put("length", column.getLength());
                beforeColumn.put("mysqlType", column.getMysqlType());

                bl.add(beforeColumn);
            }
            for (CanalEntry.Column column : afterColumns) {
                IdentityHashMap<String, Object> afterColumn = new IdentityHashMap<>();

                afterColumn.put("index", column.getIndex());
                afterColumn.put("sqlType", column.getSqlType());
                afterColumn.put("name", column.getName());
                afterColumn.put("isKey", column.getIsKey());
                afterColumn.put("updated", column.getUpdated());
                afterColumn.put("isNull", column.getIsNull());
                afterColumn.put("value", column.getValue());
                afterColumn.put("length", column.getLength());
                afterColumn.put("mysqlType", column.getMysqlType());

                al.add(afterColumn);
            }

            oneRowData.put(new String("afterColumns"), al);
            oneRowData.put(new String("beforeColumn"), bl);
            rowDatasMapList.add(oneRowData);
        }

        storeValueMap.put("rowDatas", rowDatasMapList);

        map.put("header", headerMap);
        map.put("entryType", entry.getEntryType());
        map.put("storeValue", storeValueMap);
        arrayList.add(map);

        JSONArray json = new JSONArray(arrayList);

        return JSON.toJSONString(json, SerializerFeature.DisableCircularReferenceDetect);
    }
}

Listening to the canal class

package com.tiancai.dataHandle;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.tiancai.enums.RabbitToDataEnum;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;


/***
 *@title CanalClient
 *@description
 *@author panyj
 *@version
 *@create 2023/10/27
 **/
@Component
public class CanalClient {
    //sql queue
    private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    @Resource
    private DataSource dataSource;

    @Resource
    private DataHandleService dataHandleService;

    /**
     * canal storage method
     */
    public void run() {

        // Listen to canal
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe("jeecg-boot\..*");
            connector.rollback();
            try {
                while (true) {
                    //Try to pull data batchSize records from the master, take as many as there are
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }
  

    /**
     * data processing
     *
     * @param entries
     */
    private void dataHandle(List<Entry> entries) throws Exception {
        for (Entry entry : entries) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    //Convert to json format and send to MQ
                    String jsonString = MessageToJson.evaluevaluateate(entry);
                    dataHandleService.sendMessageToDataHandleQueue(RabbitToDataEnum.ReceiptDataToServer.type,jsonString);
 
                } else if (eventType == EventType.UPDATE) {
                    String jsonString = MessageToJson.evaluevaluateate(entry);
                    dataHandleService.sendMessageToDataHandleQueue(RabbitToDataEnum.ReceiptDataToServer.type,jsonString);
                } else if (eventType == EventType.INSERT) {
                    //saveInsertSql(entry);
                    String jsonString = MessageToJson.evaluevaluateate(entry);
                    dataHandleService.sendMessageToDataHandleQueue(RabbitToDataEnum.ReceiptDataToServer.type,jsonString);
                }
             }
        }
    }
}


data processing

package com.tiancai.dataHandle;



import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.tiancai.util.SpringUtils;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.context.ApplicationContext;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

/***
 *@title dataHandleSql
 *@description
 *@author panyj
 *@version
 *@create 2023/10/27
 **/
public class DataHandleSql {

    private ApplicationContext applicationContext = SpringUtils.getApplicationContext();
    private DataSource dataSource = applicationContext.getBean(DataSource.class);


    /**
     * Save update statement
     *
     * @param jsonObject
     */
    public static void saveUpdateSql(JSONObject jsonObject) {
        try {
            JSONArray jsonArray = jsonObject.getJSONObject("storeValue").getJSONArray("rowDatas");
            for(int i=0;i<jsonArray.size();i + + ){
                JSONArray columnArray = jsonArray.getJSONObject(i).getJSONArray("afterColumns");
                StringBuffer sql = new StringBuffer("update " + jsonObject.getJSONObject("header").get("tableName") + " set ");
                for (int j = 0; j < columnArray.size(); j + + ) {
                    if(columnArray.getJSONObject(j).get("isNull").toString().equals("false")){
                        sql.append(" " + columnArray.getJSONObject(j).get("name")
                                 + " = '" + columnArray.getJSONObject(j).get("value") + "'");
                        sql.append(",");
                    }
                }
                sql.deleteCharAt(sql.length()-1);
                sql.append(" where ");
                JSONArray oldColumnArray = jsonArray.getJSONObject(i).getJSONArray("beforeColumn");
                Map<String,String> map = new HashMap<>();
                for (int k = 0; k < oldColumnArray.size(); k + + ) {
                    if(oldColumnArray.getJSONObject(k).get("isKey").toString().equals("true") ){
                        map.put(oldColumnArray.getJSONObject(k).get("name").toString(),oldColumnArray.getJSONObject(k).get("value").toString());
                    }
                }
                // Multiple primary key issues
                int e = 0;
                for(String key : map.keySet()){
                    if(map.size() == 1){
                        sql.append(key + "='" + map.get(key) + "'");
                        break;
                    }else{
                        if(e==0){
                            sql.append(key + "='" + map.get(key) + "'");
                            e++;
                            continue;
                        }
                        sql.append(" and ");
                        sql.append(key + "='" + map.get(key) + "'");
                    }
                }
                new DataHandleSql().execute(sql.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Save delete statement
     *
     * @param jsonObject
     */
    public static void saveDeleteSql(JSONObject jsonObject) {
        try {
            JSONArray jsonArray = jsonObject.getJSONObject("storeValue").getJSONArray("rowDatas");
            for(int i=0;i<jsonArray.size();i + + ){
                JSONArray columnArray = jsonArray.getJSONObject(i).getJSONArray("beforeColumn");
                StringBuffer sql = new StringBuffer("delete from " + jsonObject.getJSONObject("header").get("tableName") + " where ");
                Map<String,String> map = new HashMap<>();
                for (int j = 0; j < columnArray.size(); j + + ) {
                    if(columnArray.getJSONObject(j).get("isKey").toString().equals("true")){
                        map.put(columnArray.getJSONObject(j).get("name").toString(),columnArray.getJSONObject(j).get("value").toString());
                    }
                }
                // Multiple primary key issues
                int k = 0;
                for(String key : map.keySet()){
                    if(map.size() == 1){
                        sql.append(key + "='" + map.get(key) + "'");
                        break;
                    }else{
                        if(k==0){
                            sql.append(key + "='" + map.get(key) + "'");
                            k++;
                            continue;
                        }
                        sql.append(" and ");
                        sql.append(key + "='" + map.get(key) + "'");
                    }
                }

                new DataHandleSql().execute(sql.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Save the insert statement
     *
     * @param jsonObject
     */
    public static void saveInsertSql(JSONObject jsonObject) {
        try {
            JSONArray jsonArray = jsonObject.getJSONObject("storeValue").getJSONArray("rowDatas");
            for(int i=0;i<jsonArray.size();i + + ){
                JSONArray columnArray = jsonArray.getJSONObject(i).getJSONArray("afterColumns");
                StringBuffer sql = new StringBuffer("insert into " + jsonObject.getJSONObject("header").get("tableName") + " (");
                for (int j = 0; j < columnArray.size(); j + + ) {
                    if(columnArray.getJSONObject(j).get("isNull").toString().equals("false")){
                        sql.append(columnArray.getJSONObject(j).get("name"));
                        sql.append(",");
                    }
                }
                sql.deleteCharAt(sql.length()-1);
                sql.append(") VALUES (");
                for (int j = 0; j < columnArray.size(); j + + ) {
                    if(columnArray.getJSONObject(j).get("isNull").toString().equals("false")){
                        sql.append("'" + columnArray.getJSONObject(j).get("value") + "'");
                        sql.append(",");
                    }
                }
                sql.deleteCharAt(sql.length()-1);
                sql.append(")");
                new DataHandleSql().execute(sql.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void handData(JSONObject jsonObject){
        String eventType = jsonObject.getJSONObject("header").get("eventType").toString();
        if (eventType.equals("DELETE")) {
            saveDeleteSql(jsonObject);
        } else if (eventType.equals("UPDATE")) {
            saveUpdateSql(jsonObject);
        } else if (eventType.equals("INSERT")) {
            saveInsertSql(jsonObject);
        }
    }

    /**
     * Warehouse
     * @param sql
     */
    public void execute(String sql) {
        Connection con = null;
        try {
            if(null == sql) return;
            con = dataSource.getConnection();
            QueryRunner qr = new QueryRunner();
            int row = qr.execute(con, sql);
            System.out.println("sql: " + sql);
            System.out.println("update: " + row);
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            DbUtils.closeQuietly(con);
        }
    }
}

Inject bean tool class

package com.tiancai.util;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;


/***
 *@title SpringUtils
 *@description
 *@author panyj
 *@version
 *@create 2023/10/27
 **/
@Component
public class SpringUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if(SpringUtils.applicationContext == null){
            SpringUtils.applicationContext = applicationContext;
        }
    }


    //Get applicationContext
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    //Get Bean by name.
    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }

    //Get Bean through class.
    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }

    //Return the specified Bean through name and Clazz
    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name, clazz);
    }
}

Startup class

package com.tiancai;

import com.tiancai.dataHandle.CanalClient;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;

@SpringBootApplication
public class ReceiveSynDataApplication implements CommandLineRunner {

    @Resource
    private CanalClient canalClient;

    public static void main(String[] args) {
        SpringApplication.run(ReceiveSynDataApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {
        //Start the project and execute canal client monitoring
        canalClient.run();
    }

}