ThreadPoolExecutor VS ThreadPoolTaskExecutor
ThreadPoolTaskExecutor encapsulates ThreadPoolExecutor.
Configuration file application.yml
# Asynchronous thread configuration custom usage parameters async: executor: thread: core_pool_size: 10 max_pool_size: 100 # Configure the maximum number of threads queue_capacity: 99988 # Configure queue size keep_alive_seconds: 20 #Set the thread idle waiting time seconds name: prefix: async-thread- # Configure the name prefix of threads in the thread pool
Configuration class
@Configuration @EnableAsync @Slf4j public class ThreadPoolConfig{<!-- --> \t //Customize usage parameters @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; //Configure the number of core threads @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; //Configure the maximum number of threads @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Value("${async.executor.thread.keep_alive_seconds}") private int keepAliveSeconds; /** 1. Customize asyncServieExecutor thread pool */ @Bean(name = "asyncServiceExecutor") public ThreadPoolTaskExecutor asyncServiceExecutor(){<!-- --> \t\t log.info("start asyncServiceExecutor......"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //Configure the number of core threads executor.setCorePoolSize(corePoolSize); //Configure the maximum number of threads executor.setMaxPoolSize(maxPoolSize); //Set the thread idle waiting time s executor.setKeepAliveSeconds(keepAliveSeconds); //Configure queue size Set the size of the task waiting queue executor.setQueueCapacity(queueCapacity); //Configure the name prefix of the thread in the thread pool //Set the prefix of the thread name in the thread pool-------recommended by Alibaba Coding Convention--to facilitate debugging after errors executor.setThreadNamePrefix(namePrefix); /** Rejection-policy: How to handle new tasks when the pool has reached the max size CALLER_RUNS: The task is not executed in a new thread, but is executed by the thread where the caller is located. */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); //Perform initialization executor.initialize(); return executor; } /** Public thread pool, calculated using the number of system availableProcessors threads */ @Bean(name="commonThreadPoolTaskExecutor") public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor(){<!-- --> \t\t ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); \t\t //Returns the number of Java virtual machines with available processors int processNum = Runtime.getRuntime().availableProcessors(); int corePoolSize = (int)(processNum / (1-0.2)); int maxPoolSize = (int)(processNum / (1-0.5)); \t pool.setCorePoolSize(corePoolSize); // Core pool size pool.setMaxPoolSize(maxPoolSize); // Maximum number of threads pool.setQueueCapacity(maxPoolSize * 1000); // Queue size pool.setThreadPriority(Thread.MAX_PRIORITY); pool.setDaemon(false); pool.setKeepAliveSeconds(300);//Thread idle time \t\t return pool; } /** Custom defaultThreadPoolExecutor thread pool */ @Bean(name="defaultThreadPoolExecutor",destroyMethod = "shutdown") public ThreadPoolExecutor systemCheckPoolExecutorService(){<!-- --> \t\t int maxNumPool=Runtime.getRuntime().availableProcessors(); return new ThreadPoolExecutor(3, maxNumPool, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000), //Set the thread name prefix, for example, set the prefix to hutool-thread-, then the thread name will be hutool-thread-1 or the like. new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(), (r, executor) -> log.error("system pool is full! ")); } }
Asynchronous thread business class
//Customize asyncServiceExecutor thread pool @Override @Async("asyncServiceExecutor") public void executeAsync(List<Student> students, StudentService studentService, CountDownLatch countDownLatch){<!-- --> \t try{<!-- --> log.info("start executeAsync"); //What the asynchronous thread needs to do studentService.saveBatch(students); log.info("end executeAsync"); }finally{<!-- --> countDownLatch.countDown();// This is very important. No matter whether the above program is abnormal or not, countDown must be executed, otherwise await cannot be released. } }
Split collection tool class
public class SplitListUtils {<!-- --> /** * Function description: Split collection * @param <T> Generic object * @MethodName: split * @MethodParam: [resList: the collection that needs to be split, subListLength: the number of elements in each subcollection] * @Return: java.util.List<java.util.List<T>>: Returns a list of split collections * The code uses a tool class that combines guava and common * @Author: yyalin * @CreateDate: 2022/5/6 14:44 */ public static <T> List<List<T>> split(List<T> resList, int subListLength) {<!-- --> if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {<!-- --> return Lists.newArrayList(); } List<List<T>> ret = Lists.newArrayList(); int size = resList.size(); if (size <= subListLength) {<!-- --> //The amount of data is insufficient for the size specified by subListLength ret.add(resList); } else {<!-- --> int pre = size / subListLength; int last = size % subListLength; //The previous pre collections, each size is subListLength elements for (int i = 0; i < pre; i + + ) {<!-- --> List<T> itemList = Lists.newArrayList(); for (int j = 0; j < subListLength; j + + ) {<!-- --> itemList.add(resList.get(i * subListLength + j)); } ret.add(itemList); } // Last is processed if (last > 0) {<!-- --> List<T> itemList = Lists.newArrayList(); for (int i = 0; i < last; i + + ) {<!-- --> itemList.add(resList.get(pre * subListLength + i)); } ret.add(itemList); } } return ret; } /** * Function description: Method 2: Set cutting class is to cut a large set into multiple small sets with a specified number of items to facilitate inserting data into the database. * Recommended Use * @MethodName: pagingList * @MethodParam:[resList: the collection that needs to be split, subListLength: the number of elements in each sub-collection] * @Return: java.util.List<java.util.List<T>>: Returns a list of split collections * @Author: yyalin * @CreateDate: 2022/5/6 15:15 */ public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){<!-- --> //Judge whether it is empty if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {<!-- --> return Lists.newArrayList(); } int length = resList.size(); int num = (length + pageSize-1)/pageSize; List<List<T>> newList = new ArrayList<>(); for(int i=0;i<num;i + + ){<!-- --> int fromIndex = i*pageSize; int toIndex = (i + 1)*pageSize<length?(i + 1)*pageSize:length; newList.add(resList.subList(fromIndex,toIndex)); } return newList; } //Run the test code, which can be split into 11 sets in order public static void main(String[] args) {<!-- --> //Initialization data List<String> list = Lists.newArrayList(); int size = 19; for (int i = 0; i < size; i + + ) {<!-- --> list.add("hello-" + i); } // A large collection contains multiple small collections List<List<String>> temps = pagingList(list, 100); int j = 0; // Operate each small collection in the large collection for (List<String> obj : temps) {<!-- --> System.out.println(String.format("row:%s -> size:%s,data:%s", + + j, obj.size(), obj)); } } }
Create data and perform multi-threaded asynchronous insertion
public int batchInsertWay() throws Exception {<!-- --> log.info("Start batch operation..."); Random rand = new Random(); List<Student> list = new ArrayList<>(); //Create 1 million pieces of data for (int i = 0; i < 1000003; i + + ) {<!-- --> Student student=new Student(); student.setStudentName("Daming:" + i); student.setAddr("Shanghai:" + rand.nextInt(9) * 1000); student.setAge(rand.nextInt(1000)); student.setPhone("134" + rand.nextInt(9) * 1000); list.add(student); } //2. Start multi-threaded asynchronous batch import long startTime = System.currentTimeMillis(); // start time //boolean a=studentService.batchInsert(list); List<List<Student>> list1=SplitListUtils.pagingList(list,100); //Split the collection CountDownLatch countDownLatch = new CountDownLatch(list1.size()); for (List<Student> list2 : list1) {<!-- --> asyncService.executeAsync(list2,studentService,countDownLatch); } try {<!-- --> countDownLatch.await(); // Ensure that all previous threads have been executed before proceeding to the next one; long endTime = System.currentTimeMillis(); //End time log.info("Total time spent: " + (endTime - startTime) / 1000 + " s"); // In this way, you can get the aggregate results of all thread executions below. } catch (Exception e) {<!-- --> log.error("Blocking exception:" + e.getMessage()); } return list.size(); }