Java uses jdbcTemplate to query and insert millions of data solutions

Background: Use JdbcTemplate to query 5 million data and then insert it into the database.

If so much data is directly queried and inserted in the ordinary way, the server will definitely hang up. I have tried using paging query to perform batch query and insertion. Although it can also achieve the effect of ensuring that the server does not hang up, there is a serious problem. The problem is that it is difficult to ensure the order of the data queried each time. The data queried for the first time may appear in the Nth query results. Although sorting can be added to the query sql to ensure the order of multiple queries. The order remains unchanged, but this paging query method is still not rigorous enough, because during multiple queries, data may be added or deleted. Even if the uniqueness of the sorting is guaranteed, it will lead to less data or duplicate data.

Problems that need to be solved in this process:

1. Memory overflow

Using jdbcTemplate.queryForList to query 5 million pieces of data at a time will occupy a lot of memory. Generally, servers will report memory overflow errors. By default, jdbcTemplate uses RowMapperResultSetExtractor to process the ResultSet result set, which will read all the data into memory:

Therefore, we need to write an implementation class ourselves that inherits ResultSetExtractor to implement the logic of reading ResultSet.

1. Batch insertion is slow

When we use the batchUpdate method of jdbcTemplate to save data in batches, there are several conditions required to truly perform batch saving.

1. First of all, the database itself must support batch updates, which is generally supported by mainstream databases.

2. Do not use subqueries in inserted sql statements

Only use insert into table() values() for insertion statements. Do not use select statements in values.

3. Data source connection sets the parameter rewriteBatchedStatements=true

In the oracle driver, the rewriteBatchedStatements parameter is enabled by default, but mysql is not enabled. It needs to be set manually in the data source URL connection:

Customize ResultSetExtractor as follows:

package com.zhou.db.model;


import com.zhou.db.util.SqlUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.support.JdbcUtils;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
 * Query data custom processing ResultSet
 * @author lang.zhou
 * @since 2023/1/9 17:42
 */
@Slf4j
public abstract class DataMapCallBackExtractor implements ResultSetExtractor<List<Map<String,Object>>> {

    /**
     *Start inserting every time 10,000 items are read
     */
    @Getter
    private int batchQuerySize = 1000;

    @Getter
    private List<DbColumn> columnList = new ArrayList<>(0);

    /**
     *Number of data items
     */
    @Getter
    private int dataCount = 0;

    public DataMapCallBackExtractor() {
    }

    public DataMapCallBackExtractor(int batchQuerySize) {
        if(batchQuerySize > 1000){
            this.batchQuerySize = batchQuerySize;
        }
    }

    @Override
    public List<Map<String,Object>> extractData(ResultSet rs) throws SQLException, DataAccessException {
        ResultSetMetaData resultSetMetaData = rs.getMetaData();
        //Number of result set columns
        int count = resultSetMetaData.getColumnCount();
        //The number of times the callback has been executed
        int times = 0;

        List<Map<String, Object>> list = new ArrayList<>();
        while(rs.next()){
            if(dataCount == 0){
                //Read column information for the first time
                for (int i = 1; i < count + 1; i + + ) {
                    columnList.add(SqlUtil.readResultColumn(resultSetMetaData,i));
                }
                //Callback after reading column information
                this.columnInfoCallback(columnList);
            }
            //The total number of items increases
            dataCount + + ;
            Map<String, Object> e = new LinkedHashMap<>(count);
            //Read the data of this row
            for (int i = 1; i < count + 1; i + + ) {
                e.putIfAbsent(JdbcUtils.lookupColumnName(resultSetMetaData, i), JdbcUtils.getResultSetValue(rs, i));
            }
            list.add(e);
            //Start inserting data when 10,000 items are read
            if(list.size() >= batchQuerySize){
                times + + ;

                this.dataCallback(list,times,dataCount);
                //After processing, clear the read data and release the memory
                list.clear();
            }
        }
        //The last read may be less than 10,000, insert the remaining data
        if(list.size() > 0){
            times + + ;
            this.dataCallback(list,times,dataCount);
            list.clear();
        }
        return new ArrayList<>(0);
    }

    /**
     * Customize the processing callback after reading batchQuerySize data
     */
    public abstract void dataCallback(List<Map<String, Object>> list, int times, int n);

    /**
     * Callback after reading column information
     */
    public void columnInfoCallback(List<DbColumn> columnList){

    }

}

After we get the ResultSet, we only read 10,000 pieces of data each time and store them in the List, and then insert these data into the database. After the insertion is completed, the List is cleared. The JVM will recycle the data to release the memory. This process will be repeated until the result set is read. After the retrieval is completed, this can ensure that only 10,000 pieces of data are processed in the memory, thus avoiding memory leaks.

Insert code in batches to improve insertion speed:

 /**
     * Data is inserted in batches
     */
    public void batchSizeUpdate(List<Map<String,Object>> list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate, int batchSize){
        int size = list.size();
        int n = size / batchSize;
        int l = size % batchSize;
        if(l > 0){
            n + + ;
        }
        log.info("Total points" + n + "Insertions");
        for (int i = 0; i < n; i + + ) {
            int start = i*batchSize;
            int end = (i + 1)*batchSize;
            if(end > size){
                end = size;
            }
            batchUpdate(list.subList(start,end),sql, namedParameterJdbcTemplate);
            log.info("th" + (i + 1) + "time insertion completed");
        }
    }

    private void batchUpdate(List<Map<String,Object>> list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate){
        Map<String,?> [] param = new Map[list.size()];
        for(int c= 0;c<list.size();c + + ){
            param[c] = list.get(c);
        }
        namedParameterJdbcTemplate.batchUpdate(sql,param);
    }

Final calling method:

 //Call back after reading 10,000 items at a time
    DataMapCallBackExtractor extractor = new DataMapCallBackExtractor(10000){
        @Override
        public void dataCallback(List<Map<String, Object>> list, int times, int n) {
            log.info("Read {} items for the {}th time, {} items in total", times, list.size(),n);
            //Insert in batches, 1000 at a time
            batchSizeUpdate(list,insertSql,insertJdbcTemplate);
        }

        @Override
        public void columnInfoCallback(List<DbColumn> columnList) {
            //Callback before reading the result set, get the column information and perform some processing
            //For example, splicing and inserting sql
        }
    };
    jdbcTemplate.query(sql, new HashMap<>(0),extractor);

Code to read column information in SqlUtil:

 /**
     * Read sql column information from ResultSet
     * @param rs result set
     * @param i column position
     */
    @SneakyThrows
    public static DbColumn readResultColumn(ResultSetMetaData rs,int i){
        DbColumn c = new DbColumn();
        c.setName(rs.getColumnName(i));
        c.setComments(rs.getColumnLabel(i));
        int type = rs.getColumnType(i);
        c.setDataType(rs.getColumnTypeName(i));

        c.setNullable(rs.isNullable(i) == ResultSetMetaData.columnNoNulls ? "N" : "Y");
        if(type == Types.VARCHAR || type == Types.CHAR || type == Types.LONGVARCHAR || type == Types.CLOB){
            c.setDataLength(rs.getColumnDisplaySize(i));
        }else if(type == Types.NUMERIC || type == Types.INTEGER || type == Types.BIGINT || type == Types.DECIMAL
                || type == Types.DOUBLE || type == Types.FLOAT || type == Types.REAL || type == Types.SMALLINT || type == Types.TINYINT){
            c.setDataLength(rs.getPrecision(i));
        }else if(type == Types.DATE || type == Types.TIMESTAMP){
            c.setDataLength(rs.getPrecision(i));
        }
        c.setDataScale(rs.getScale(i));
        return c;
    }

Field column information entity:

import lombok.Data;

import java.util.Objects;

/**
 *Basic information about database table fields
 * @author lang.zhou
 * @since 2022/10/17 14:31
 */
@Data
public class DbColumn {

    private String tableName;
    /**
     *Field name
     */
    private String name;
    /**
     * Field description
     */
    private String comments;
    /**
     * can be empty
     */
    private String nullable = "Y";
    private String dataType = null;
    private Integer isPk = 0;
    privateInteger dataLength = 0;
    privateInteger dataScale = 0;
    public boolean isPk(){
        return name != null & amp; & amp; isPk > 0;
    }
    public boolean isDate(){
        return name != null & amp; & amp; ("DATE".equalsIgnoreCase(dataType) || "TIMESTAMP".equalsIgnoreCase(dataType) || "DATETIME".equalsIgnoreCase(dataType));
    }
    public boolean isNumber(){
        return name != null & amp; & amp; ("NUMBER".equalsIgnoreCase(dataType) || "DECIMAL".equalsIgnoreCase(dataType) || "INTEGER".equalsIgnoreCase(dataType)
                || "INT".equalsIgnoreCase(dataType)|| "BIGINT".equalsIgnoreCase(dataType)|| "DOUBLE".equalsIgnoreCase(dataType)|| "LONG".equalsIgnoreCase(dataType)));
    }
    public boolean isChar(){
        return name != null & amp; & amp; "CHAR".equalsIgnoreCase(dataType);
    }
    public boolean allowNull(){
        return !isPk() & amp; & amp; Objects.equals(nullable,"Y");
    }
}