SpringBoot-thread pool ThreadPoolExecutor asynchronous processing (including split collection tool class)

ThreadPoolExecutor VS ThreadPoolTaskExecutor

ThreadPoolTaskExecutor encapsulates ThreadPoolExecutor.

Configuration file application.yml

# Asynchronous thread configuration custom usage parameters
async:
executor:
thread:
core_pool_size: 10
max_pool_size: 100 # Configure the maximum number of threads
     queue_capacity: 99988 # Configure queue size
      keep_alive_seconds: 20 #Set the thread idle waiting time seconds
      name:
        prefix: async-thread- # Configure the name prefix of threads in the thread pool

Configuration class

@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig{<!-- -->
\t
//Customize usage parameters
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize; //Configure the number of core threads
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize; //Configure the maximum number of threads
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;

/**
1. Customize asyncServieExecutor thread pool
*/
@Bean(name = "asyncServiceExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor(){<!-- -->
\t\t
log.info("start asyncServiceExecutor......");

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//Configure the number of core threads
        executor.setCorePoolSize(corePoolSize);
        //Configure the maximum number of threads
        executor.setMaxPoolSize(maxPoolSize);
        //Set the thread idle waiting time s
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //Configure queue size Set the size of the task waiting queue
        executor.setQueueCapacity(queueCapacity);
        //Configure the name prefix of the thread in the thread pool
        //Set the prefix of the thread name in the thread pool-------recommended by Alibaba Coding Convention--to facilitate debugging after errors
        executor.setThreadNamePrefix(namePrefix);

/**
Rejection-policy: How to handle new tasks when the pool has reached the max size
CALLER_RUNS: The task is not executed in a new thread, but is executed by the thread where the caller is located.
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

//Perform initialization
executor.initialize();
return executor;
}

/**
Public thread pool, calculated using the number of system availableProcessors threads
*/
@Bean(name="commonThreadPoolTaskExecutor")
public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor(){<!-- -->
\t\t
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
\t\t
//Returns the number of Java virtual machines with available processors
int processNum = Runtime.getRuntime().availableProcessors();
int corePoolSize = (int)(processNum / (1-0.2));
int maxPoolSize = (int)(processNum / (1-0.5));
\t
pool.setCorePoolSize(corePoolSize); // Core pool size
        pool.setMaxPoolSize(maxPoolSize); // Maximum number of threads
        pool.setQueueCapacity(maxPoolSize * 1000); // Queue size
        pool.setThreadPriority(Thread.MAX_PRIORITY);
        pool.setDaemon(false);
        pool.setKeepAliveSeconds(300);//Thread idle time
\t\t
return pool;
}

/**
Custom defaultThreadPoolExecutor thread pool
*/
@Bean(name="defaultThreadPoolExecutor",destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService(){<!-- -->
\t\t
int maxNumPool=Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(3,
                maxNumPool,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                //Set the thread name prefix, for example, set the prefix to hutool-thread-, then the thread name will be hutool-thread-1 or the like.
                new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
}
}

Asynchronous thread business class

//Customize asyncServiceExecutor thread pool
@Override
@Async("asyncServiceExecutor")
public void executeAsync(List<Student> students,
                         StudentService studentService,
                         CountDownLatch countDownLatch){<!-- -->
\t
try{<!-- -->
log.info("start executeAsync");
//What the asynchronous thread needs to do
studentService.saveBatch(students);
log.info("end executeAsync");
}finally{<!-- -->
countDownLatch.countDown();// This is very important. No matter whether the above program is abnormal or not, countDown must be executed, otherwise await cannot be released.
}
}

Split collection tool class

public class SplitListUtils {<!-- -->
    /**
     * Function description: Split collection
     * @param <T> Generic object
     * @MethodName: split
     * @MethodParam: [resList: the collection that needs to be split, subListLength: the number of elements in each subcollection]
     * @Return: java.util.List<java.util.List<T>>: Returns a list of split collections
     * The code uses a tool class that combines guava and common
     * @Author: yyalin
     * @CreateDate: 2022/5/6 14:44
     */
    public static <T> List<List<T>> split(List<T> resList, int subListLength) {<!-- -->
        if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {<!-- -->
            return Lists.newArrayList();
        }
        List<List<T>> ret = Lists.newArrayList();
        int size = resList.size();
        if (size <= subListLength) {<!-- -->
            //The amount of data is insufficient for the size specified by subListLength
            ret.add(resList);
        } else {<!-- -->
            int pre = size / subListLength;
            int last = size % subListLength;
            //The previous pre collections, each size is subListLength elements
            for (int i = 0; i < pre; i + + ) {<!-- -->
                List<T> itemList = Lists.newArrayList();
                for (int j = 0; j < subListLength; j + + ) {<!-- -->
                    itemList.add(resList.get(i * subListLength + j));
                }
                ret.add(itemList);
            }
            // Last is processed
            if (last > 0) {<!-- -->
                List<T> itemList = Lists.newArrayList();
                for (int i = 0; i < last; i + + ) {<!-- -->
                    itemList.add(resList.get(pre * subListLength + i));
                }
                ret.add(itemList);
            }
        }
        return ret;
    }

    /**
     * Function description: Method 2: Set cutting class is to cut a large set into multiple small sets with a specified number of items to facilitate inserting data into the database.
     * Recommended Use
     * @MethodName: pagingList
     * @MethodParam:[resList: the collection that needs to be split, subListLength: the number of elements in each sub-collection]
     * @Return: java.util.List<java.util.List<T>>: Returns a list of split collections
     * @Author: yyalin
     * @CreateDate: 2022/5/6 15:15
     */
    public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){<!-- -->
        //Judge whether it is empty
        if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {<!-- -->
            return Lists.newArrayList();
        }
        int length = resList.size();
        int num = (length + pageSize-1)/pageSize;
        List<List<T>> newList = new ArrayList<>();
        for(int i=0;i<num;i + + ){<!-- -->
            int fromIndex = i*pageSize;
            int toIndex = (i + 1)*pageSize<length?(i + 1)*pageSize:length;
            newList.add(resList.subList(fromIndex,toIndex));
        }
        return newList;
    }

    //Run the test code, which can be split into 11 sets in order
    public static void main(String[] args) {<!-- -->
        //Initialization data
        List<String> list = Lists.newArrayList();
        int size = 19;
        for (int i = 0; i < size; i + + ) {<!-- -->
            list.add("hello-" + i);
        }
        // A large collection contains multiple small collections
        List<List<String>> temps = pagingList(list, 100);
        int j = 0;
        // Operate each small collection in the large collection
        for (List<String> obj : temps) {<!-- -->
            System.out.println(String.format("row:%s -> size:%s,data:%s", + + j, obj.size(), obj));
        }
    }

}

Create data and perform multi-threaded asynchronous insertion

public int batchInsertWay() throws Exception {<!-- -->
        log.info("Start batch operation...");
        Random rand = new Random();
        List<Student> list = new ArrayList<>();
        //Create 1 million pieces of data
        for (int i = 0; i < 1000003; i + + ) {<!-- -->
            Student student=new Student();
            student.setStudentName("Daming:" + i);
            student.setAddr("Shanghai:" + rand.nextInt(9) * 1000);
            student.setAge(rand.nextInt(1000));
            student.setPhone("134" + rand.nextInt(9) * 1000);
            list.add(student);
        }
        //2. Start multi-threaded asynchronous batch import
        long startTime = System.currentTimeMillis(); // start time
        //boolean a=studentService.batchInsert(list);
        List<List<Student>> list1=SplitListUtils.pagingList(list,100); //Split the collection
        CountDownLatch countDownLatch = new CountDownLatch(list1.size());
        for (List<Student> list2 : list1) {<!-- -->
            asyncService.executeAsync(list2,studentService,countDownLatch);
        }
        try {<!-- -->
            countDownLatch.await(); // Ensure that all previous threads have been executed before proceeding to the next one;
            long endTime = System.currentTimeMillis(); //End time
            log.info("Total time spent: " + (endTime - startTime) / 1000 + " s");
            // In this way, you can get the aggregate results of all thread executions below.
        } catch (Exception e) {<!-- -->
            log.error("Blocking exception:" + e.getMessage());
        }
        return list.size();

    }