Use cdc technology to synchronize data in real time (canal) – monitor local data and synchronize data to the cloud database through MQ
1. Download the canal package and modify the configuration file conf -> instance.properties under example
- canal.instance.master.address=192.168.8.211:3306 Change the connection database path
- canal.instance.dbUsername=root user account to connect to the database
canal.instance.dbPassword=123456 User password for connecting to the database - canal.instance.filter.regex=jeecg-boot\…* Configure all tables of the database that need to be synchronized
2. Start startup.sh under bin and start canal middleware
3. spring-boot integrates canal and starts coding
pom.xml import dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>commons-dbutils</groupId> <artifactId>commons-dbutils</artifactId> <version>1.7</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.6</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.6</version> </dependency> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-udf</artifactId> <version>0.31.4-public</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency>
RabbitMq
package com.tiancai.dataHandle; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /*** *@title dataHandleMq *@description *@author panyj *@version *@create 2023/10/28 **/ @Configuration public class DataHandleMq { @Value("${spring.datahandle.host}") private String host; @Value("${spring.datahandle.port}") private int port; @Value("${spring.datahandle.username}") private String username; @Value("${spring.datahandle.password}") private String password; @Value("${spring.datahandle.virtual-host}") private String virtualHost; @Bean public ConnectionFactory dataHandleConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean public RabbitTemplate dataHandleRabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(dataHandleConnectionFactory()); return rabbitTemplate; } }
package com.tiancai.dataHandle; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; /*** *@title DataHandleService *@description *@author panyj *@version *@create 2023/10/28 **/ @Service @Slf4j public class DataHandleService{ @Resource private RabbitTemplate dataHandleRabbitTemplate; public void sendMessageToDataHandleQueue(String queue,String message) { log.info(queue + " DataHandleService sends message: " + message); dataHandleRabbitTemplate.convertAndSend(queue, message); } @RabbitListener(queues = "ReceiptDataToServer") public void receiveMessageFromFirstQueue(String message) { System.out.println("Received message from ReceiptDataToServer: " + message); JSONArray jsonArray = JSON.parseArray(message); DataHandleSql.handData(jsonArray.getJSONObject(0)); } }
enumerate
package com.tiancai.enums; public enum RabbitToDataEnum { ReceiptDataToServer("ReceiptDataToServer","Data Synchronization"); RabbitToDataEnum(String type, String desc){ this.type = type; this.desc = desc; } public String type; public String desc; }
Convert to json
package com.tiancai.dataHandle; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.otter.canal.protocol.CanalEntry; import com.aliyun.odps.udf.UDF; import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; /*** *@titleMessageToJson *@description *@author panyj *@version *@create 2023/10/27 **/ public class MessageToJson extends UDF { public static String evaluevaluateate(CanalEntry.Entry entry ) throws Exception { ArrayList<Object> arrayList = new ArrayList<>(); HashMap<String, Object> map = new HashMap<>(); HashMap<String, Object> headerMap = new HashMap<>(); CanalEntry.Header header = entry.getHeader(); headerMap.put("version", header.getVersion()); headerMap.put("logfileName", header.getLogfileName()); headerMap.put("logfileOffset", header.getLogfileName()); headerMap.put("serverId", header.getLogfileOffset()); headerMap.put("serverenCode", header.getServerenCode()); headerMap.put("executeTime", header.getExecuteTime()); headerMap.put("sourceType", header.getSourceType()); headerMap.put("schemaName", header.getSchemaName()); headerMap.put("tableName", header.getTableName()); headerMap.put("eventLength", header.getEventLength()); headerMap.put("eventType", header.getEventType()); headerMap.put("gtid", header.getGtid()); HashMap<String, Object> storeValueMap = new HashMap<>(); ByteString storeValue = entry.getStoreValue(); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); storeValueMap.put("tableId", rowChange.getTableId()); storeValueMap.put("eventType", rowChange.getEventType()); storeValueMap.put("isDdl", rowChange.getIsDdl()); storeValueMap.put("sql", rowChange.getSql()); storeValueMap.put("ddlSchemaName", rowChange.getDdlSchemaName()); ArrayList<Object> rowDatasMapList = new ArrayList<>(); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { // ArrayList<Object> oneRowData = new ArrayList<>(); HashMap<String, Object> oneRowData = new HashMap<>(); ArrayList<IdentityHashMap> bl = new ArrayList<>(); ArrayList<IdentityHashMap> al = new ArrayList<>(); List<CanalEntry.Column> afterColumns = rowData.getAfterColumnsList(); List<CanalEntry.Column> beforeColumns = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumns) { IdentityHashMap<String, Object> beforeColumn = new IdentityHashMap<>(); beforeColumn.put("index", column.getIndex()); beforeColumn.put("sqlType", column.getSqlType()); beforeColumn.put("name", column.getName()); beforeColumn.put("isKey", column.getIsKey()); beforeColumn.put("updated", column.getUpdated()); beforeColumn.put("isNull", column.getIsNull()); beforeColumn.put("value", column.getValue()); beforeColumn.put("length", column.getLength()); beforeColumn.put("mysqlType", column.getMysqlType()); bl.add(beforeColumn); } for (CanalEntry.Column column : afterColumns) { IdentityHashMap<String, Object> afterColumn = new IdentityHashMap<>(); afterColumn.put("index", column.getIndex()); afterColumn.put("sqlType", column.getSqlType()); afterColumn.put("name", column.getName()); afterColumn.put("isKey", column.getIsKey()); afterColumn.put("updated", column.getUpdated()); afterColumn.put("isNull", column.getIsNull()); afterColumn.put("value", column.getValue()); afterColumn.put("length", column.getLength()); afterColumn.put("mysqlType", column.getMysqlType()); al.add(afterColumn); } oneRowData.put(new String("afterColumns"), al); oneRowData.put(new String("beforeColumn"), bl); rowDatasMapList.add(oneRowData); } storeValueMap.put("rowDatas", rowDatasMapList); map.put("header", headerMap); map.put("entryType", entry.getEntryType()); map.put("storeValue", storeValueMap); arrayList.add(map); JSONArray json = new JSONArray(arrayList); return JSON.toJSONString(json, SerializerFeature.DisableCircularReferenceDetect); } }
Listening to the canal class
package com.tiancai.dataHandle; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.tiancai.enums.RabbitToDataEnum; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.net.InetSocketAddress; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /*** *@title CanalClient *@description *@author panyj *@version *@create 2023/10/27 **/ @Component public class CanalClient { //sql queue private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); @Resource private DataSource dataSource; @Resource private DataHandleService dataHandleService; /** * canal storage method */ public void run() { // Listen to canal CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe("jeecg-boot\..*"); connector.rollback(); try { while (true) { //Try to pull data batchSize records from the master, take as many as there are Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); } else { dataHandle(message.getEntries()); } connector.ack(batchId); } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } finally { connector.disconnect(); } } /** * data processing * * @param entries */ private void dataHandle(List<Entry> entries) throws Exception { for (Entry entry : entries) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { //Convert to json format and send to MQ String jsonString = MessageToJson.evaluevaluateate(entry); dataHandleService.sendMessageToDataHandleQueue(RabbitToDataEnum.ReceiptDataToServer.type,jsonString); } else if (eventType == EventType.UPDATE) { String jsonString = MessageToJson.evaluevaluateate(entry); dataHandleService.sendMessageToDataHandleQueue(RabbitToDataEnum.ReceiptDataToServer.type,jsonString); } else if (eventType == EventType.INSERT) { //saveInsertSql(entry); String jsonString = MessageToJson.evaluevaluateate(entry); dataHandleService.sendMessageToDataHandleQueue(RabbitToDataEnum.ReceiptDataToServer.type,jsonString); } } } } }
data processing
package com.tiancai.dataHandle; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.tiancai.util.SpringUtils; import org.apache.commons.dbutils.DbUtils; import org.apache.commons.dbutils.QueryRunner; import org.springframework.context.ApplicationContext; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; /*** *@title dataHandleSql *@description *@author panyj *@version *@create 2023/10/27 **/ public class DataHandleSql { private ApplicationContext applicationContext = SpringUtils.getApplicationContext(); private DataSource dataSource = applicationContext.getBean(DataSource.class); /** * Save update statement * * @param jsonObject */ public static void saveUpdateSql(JSONObject jsonObject) { try { JSONArray jsonArray = jsonObject.getJSONObject("storeValue").getJSONArray("rowDatas"); for(int i=0;i<jsonArray.size();i + + ){ JSONArray columnArray = jsonArray.getJSONObject(i).getJSONArray("afterColumns"); StringBuffer sql = new StringBuffer("update " + jsonObject.getJSONObject("header").get("tableName") + " set "); for (int j = 0; j < columnArray.size(); j + + ) { if(columnArray.getJSONObject(j).get("isNull").toString().equals("false")){ sql.append(" " + columnArray.getJSONObject(j).get("name") + " = '" + columnArray.getJSONObject(j).get("value") + "'"); sql.append(","); } } sql.deleteCharAt(sql.length()-1); sql.append(" where "); JSONArray oldColumnArray = jsonArray.getJSONObject(i).getJSONArray("beforeColumn"); Map<String,String> map = new HashMap<>(); for (int k = 0; k < oldColumnArray.size(); k + + ) { if(oldColumnArray.getJSONObject(k).get("isKey").toString().equals("true") ){ map.put(oldColumnArray.getJSONObject(k).get("name").toString(),oldColumnArray.getJSONObject(k).get("value").toString()); } } // Multiple primary key issues int e = 0; for(String key : map.keySet()){ if(map.size() == 1){ sql.append(key + "='" + map.get(key) + "'"); break; }else{ if(e==0){ sql.append(key + "='" + map.get(key) + "'"); e++; continue; } sql.append(" and "); sql.append(key + "='" + map.get(key) + "'"); } } new DataHandleSql().execute(sql.toString()); } } catch (Exception e) { e.printStackTrace(); } } /** * Save delete statement * * @param jsonObject */ public static void saveDeleteSql(JSONObject jsonObject) { try { JSONArray jsonArray = jsonObject.getJSONObject("storeValue").getJSONArray("rowDatas"); for(int i=0;i<jsonArray.size();i + + ){ JSONArray columnArray = jsonArray.getJSONObject(i).getJSONArray("beforeColumn"); StringBuffer sql = new StringBuffer("delete from " + jsonObject.getJSONObject("header").get("tableName") + " where "); Map<String,String> map = new HashMap<>(); for (int j = 0; j < columnArray.size(); j + + ) { if(columnArray.getJSONObject(j).get("isKey").toString().equals("true")){ map.put(columnArray.getJSONObject(j).get("name").toString(),columnArray.getJSONObject(j).get("value").toString()); } } // Multiple primary key issues int k = 0; for(String key : map.keySet()){ if(map.size() == 1){ sql.append(key + "='" + map.get(key) + "'"); break; }else{ if(k==0){ sql.append(key + "='" + map.get(key) + "'"); k++; continue; } sql.append(" and "); sql.append(key + "='" + map.get(key) + "'"); } } new DataHandleSql().execute(sql.toString()); } } catch (Exception e) { e.printStackTrace(); } } /** * Save the insert statement * * @param jsonObject */ public static void saveInsertSql(JSONObject jsonObject) { try { JSONArray jsonArray = jsonObject.getJSONObject("storeValue").getJSONArray("rowDatas"); for(int i=0;i<jsonArray.size();i + + ){ JSONArray columnArray = jsonArray.getJSONObject(i).getJSONArray("afterColumns"); StringBuffer sql = new StringBuffer("insert into " + jsonObject.getJSONObject("header").get("tableName") + " ("); for (int j = 0; j < columnArray.size(); j + + ) { if(columnArray.getJSONObject(j).get("isNull").toString().equals("false")){ sql.append(columnArray.getJSONObject(j).get("name")); sql.append(","); } } sql.deleteCharAt(sql.length()-1); sql.append(") VALUES ("); for (int j = 0; j < columnArray.size(); j + + ) { if(columnArray.getJSONObject(j).get("isNull").toString().equals("false")){ sql.append("'" + columnArray.getJSONObject(j).get("value") + "'"); sql.append(","); } } sql.deleteCharAt(sql.length()-1); sql.append(")"); new DataHandleSql().execute(sql.toString()); } } catch (Exception e) { e.printStackTrace(); } } public static void handData(JSONObject jsonObject){ String eventType = jsonObject.getJSONObject("header").get("eventType").toString(); if (eventType.equals("DELETE")) { saveDeleteSql(jsonObject); } else if (eventType.equals("UPDATE")) { saveUpdateSql(jsonObject); } else if (eventType.equals("INSERT")) { saveInsertSql(jsonObject); } } /** * Warehouse * @param sql */ public void execute(String sql) { Connection con = null; try { if(null == sql) return; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.execute(con, sql); System.out.println("sql: " + sql); System.out.println("update: " + row); } catch (SQLException e) { e.printStackTrace(); } finally { DbUtils.closeQuietly(con); } } }
Inject bean tool class
package com.tiancai.util; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /*** *@title SpringUtils *@description *@author panyj *@version *@create 2023/10/27 **/ @Component public class SpringUtils implements ApplicationContextAware { private static ApplicationContext applicationContext = null; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if(SpringUtils.applicationContext == null){ SpringUtils.applicationContext = applicationContext; } } //Get applicationContext public static ApplicationContext getApplicationContext() { return applicationContext; } //Get Bean by name. public static Object getBean(String name){ return getApplicationContext().getBean(name); } //Get Bean through class. public static <T> T getBean(Class<T> clazz){ return getApplicationContext().getBean(clazz); } //Return the specified Bean through name and Clazz public static <T> T getBean(String name,Class<T> clazz){ return getApplicationContext().getBean(name, clazz); } }
Startup class
package com.tiancai; import com.tiancai.dataHandle.CanalClient; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.Resource; @SpringBootApplication public class ReceiveSynDataApplication implements CommandLineRunner { @Resource private CanalClient canalClient; public static void main(String[] args) { SpringApplication.run(ReceiveSynDataApplication.class, args); } @Override public void run(String... strings) throws Exception { //Start the project and execute canal client monitoring canalClient.run(); } }