Canal Five Steps – How to deal with the idempotence of insert

canal uses Rocketmq to receive binlog events collected by mysql to decouple collection and processing. At the same time, it meets the needs of multiple parties for consumption at one time. So since Rocketmq is used, there will definitely be problems with MQ consumption timeout or MQ retransmission of failed processing.

So how does canal deal with the idempotence problem of MQ repeated consumption

Generally, in business, we will generate a uuid for each message to mark the uniqueness of this message. When consuming, the business table adds a uuid field or an MQ unique table to determine whether the message has been processed. If it has been consumed, it will directly return MQ ack.
But there is no uuid field in the t_user table we defined for checking uniqueness. So how does canal do it?
First, start the analysis by receiving RocketMQ code from canal.
canal is making different adapter implementations for consumers, such as: RdbAdapter, ESAdapter, HbaseAdapter
The mysql database we use directly analyzes RdbAdapter

 /**
     * Synchronization method
     *
     * @param dmls data package
     */
    @Override
    public void sync(List<Dml> dmls) {<!-- -->
        if (dmls == null || dmls.isEmpty()) {<!-- -->
            return;
        }
        try {<!-- -->
            //rdb synchronization service
            rdbSyncService.sync(mappingConfigCache, dmls, envProperties);
            rdbMirrorDbSyncService.sync(dmls);
        } catch (Exception e) {<!-- -->
            throw new RuntimeException(e);
        }
    }

RdbSyncService

 //dmlsPartition will not be analyzed here, but will be analyzed below.
    futures.add(executorThreads[i].submit(() -> {<!-- -->
        try {<!-- -->
            //Execute dml in dmlsPartition in parallel through multiple threads
            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
                syncItem.config,
                syncItem.singleDml));
            dmlsPartition[j].clear();
            batchExecutors[j].commit();
            return true;
        } catch (Throwable e) {<!-- -->
            dmlsPartition[j].clear();
            batchExecutors[j].rollback();
            throw new RuntimeException(e);
        }
    }));
 /**
     * Single dml synchronization
     *
     * @param batchExecutor batch transaction executor
     * @param config corresponding configuration object
     * @param dml DML
     */
    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {<!-- -->
        if (config != null) {<!-- -->
            try {<!-- -->
                String type = dml.getType();
                if (type != null & amp; & amp; type.equalsIgnoreCase("INSERT")) {<!-- -->
                    //Directly analyze insert
                    insert(batchExecutor, config, dml);
                } else if (type != null & amp; & amp; type.equalsIgnoreCase("UPDATE")) {<!-- -->
                    update(batchExecutor, config, dml);
                } else if (type != null & amp; & amp; type.equalsIgnoreCase("DELETE")) {<!-- -->
                    delete(batchExecutor, config, dml);
                } else if (type != null & amp; & amp; type.equalsIgnoreCase("TRUNCATE")) {<!-- -->
                    truncate(batchExecutor, config);
                }
                if (logger.isDebugEnabled()) {<!-- -->
                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                }
            } catch (SQLException e) {<!-- -->
                throw new RuntimeException(e);
            }
        }
    }

When canal was inserting, a SQLException occurred when a primary key conflict occurred. The default value of skipDupException is =true, which directly ignores this exception.

 /**
     * Insert operation
     *
     * @param config configuration item
     * @param dml DML data
     */
    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {<!-- -->
        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) {<!-- -->
            return;
        }

        DbMapping dbMapping = config.getDbMapping();

        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

        StringBuilder insertSql = new StringBuilder();
        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");

        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`")
            .append(targetColumnName)
            .append("`")
            .append(","));
        int len = insertSql.length();
        insertSql.delete(len - 1, len).append(") VALUES (");
        int mapLen = columnsMap.size();
        for (int i = 0; i < mapLen; i + + ) {<!-- -->
            insertSql.append("?,");
        }
        len = insertSql.length();
        insertSql.delete(len - 1, len).append(")");

        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

        List<Map<String, ?>> values = new ArrayList<>();
        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {<!-- -->
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {<!-- -->
                srcColumnName = Util.cleanColumn(targetColumnName);
            }

            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
            if (type == null) {<!-- -->
                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
            }
            Object value = data.get(srcColumnName);
            BatchExecutor.setValue(values, type, value);
        }

        try {<!-- -->
            batchExecutor.execute(insertSql.toString(), values);
        } catch (SQLException e) {<!-- -->
            if (skipDupException
                 & amp; & amp; (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) {<!-- -->
                // ignore
                // TODO Add more error codes for primary key conflicts in relational databases
            } else {<!-- -->
                throw e;
            }
        }
        if (logger.isTraceEnabled()) {<!-- -->
            logger.trace("Insert into target table, sql: {}", insertSql);
        }

    }

Conclusion

Canal uses the ignore method when handling MQ repeated consumption of insert events. When this data exists in the database, the database will return Duplicate entry to tell canal that this data is already in the database. Canal can just reply MQ ack directly.

Extension: batch insertion of insert

Execute a batch insert SQL in the source database, how canal synchronizes it.

insert into t_user (username,password,create_time,sex)
values ('1','1','2020-10-10',1) , ('1','1','2020-10-10',1);

Return to the batch synchronization method of canal’s RdbAdapter

 /**
     * Batch synchronization
     *
     * @param mappingConfig configuration collection
     * @param dmls batch DML
     */
    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {<!-- -->
        sync(dmls, dml -> {<!-- -->
            if (dml.getIsDdl() != null & amp; & amp; dml.getIsDdl() & amp; & amp; StringUtils.isNotEmpty(dml.getSql())) {<!-- -->
                //DDL
            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
            return false;
        } else {<!-- -->
            //DML
            ...

            for (MappingConfig config : configMap.values()) {<!-- -->
                boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
                if (config.getConcurrent()) {<!-- -->
                    //Convert the batch of multiple values data into individual inserts
                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                    singleDmls.forEach(singleDml -> {<!-- -->
                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
                        SyncItem syncItem = new SyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);
                    });
                } else {<!-- -->
                    int hash = 0;
                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                    singleDmls.forEach(singleDml -> {<!-- -->
                        SyncItem syncItem = new SyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);
                    });
                }
            }
            return true;
        }
    } );
    }

Continue to analyze the role of dmlsPartition

key code

 List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
    singleDmls.forEach(singleDml -> {<!-- -->
        int hash = pkHash(config.getDbMapping(), singleDml.getData());
        SyncItem syncItem = new SyncItem(config, singleDml);
        dmlsPartition[hash].add(syncItem);
    });

Canal converts a single batch insert SQL into multiple single inserts. And hash (pk % threads) of each primary key pk and the number of processing threads into different partitions, and multi-thread execution improves canal’s processing capabilities.