Springboot custom ThreadPoolTaskExecutor thread pool multi-threaded concurrent execution of asynchronous methods

1. Background

Currently, due to work requirements, a large number of Http requests need to be sent. After practice, it takes 6 hours to complete the traversal and sending. If a single-threaded request is sent, the main thread will be blocked. There will be the following problems:

  1. The front-end user waits too long for a response and cannot proceed to the next step, which is not conducive to the user’s operating system
  2. The response time is too long and exceeds the Tomcat server session time, causing the front-end and back-end requests to reconnect, which will throw java.io.IOException: The software in your host has terminated an established connection; Http send task
  3. If the main thread has other tasks such as batch import of Excel data at regular intervals, file upload, etc., it is easy to throw an exception due to file format problems, thereby interrupting the Http task
  4. There are many nights and dreams, and the request is sent for a long time, so it is impossible to judge whether the execution is completed; if an exception is thrown, or the service needs to be restarted, it is impossible to judge what stage the execution has reached, and it takes a lot of time and energy to locate the resend location, and the gain outweighs the loss, so all can only be resent (same face the above problems).

2. Solutions

After consideration, multi-threading can be used to solve the problem, and the main thread is responsible for opening sub-threads to process requests. According to business needs, multithreading can be divided into the following two categories

  • Task type 1. After opening the child thread, it returns to respond to the user’s request.
  • Task type 2. Wait for the sub-thread to respond to the result after starting the sub-thread, and then respond to the user request

3. Specific implementation

There is already an integrated ThreadPoolTaskExecutor thread pool in Springboot that can be called (pay attention to distinguishing the ThreadPoolExecutor that comes with Java). Although the specific methods provided by the two thread pools are not necessarily the same, the principle of thread scheduling process is common. The scheduling of ThreadPoolExecutor is posted below The thread process serves as a reference for creating a ThreadPoolTaskExecutor.

4. Start writing code

@EnableAsync annotation enables Spring asynchronous method execution

Current thread pool:

  • Core thread pool (CorePool): 10;
  • Maximum thread pool (MaxPool): 20;
  • Queue length (QueueCapacity): 200;
  • Rejection policy (RejectedExecutionHandler): CallerRunsPolicy

express:

  • If the number of threads created by the request is <= 10 + 200, only 10 threads in the core thread pool will be used to execute the task;
  • If 10 + 200 main thread is in a blocked state. All sub-threads need to be created before returning a response , even if the task is Task Type 1, its response time will be infinitely close to Task Type 2; according to business needs, it can be solved by setting a longer queue length

    After the thread finishes processing the task, (maximum thread pool – core thread pool) > 0 & amp; & amp; After 60s of idle time, the idle threads except the core thread pool will be recycled

@EnableAsync //Annotation enables Spring asynchronous method execution function
@Configuration
public class AsyncTaskConfig {<!-- -->

    @Bean("AsyncTaskThreadPool")
    public Executor threadPoolExecutor() {<!-- -->
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // Number of core threads: the number of threads initialized when the thread pool is created
        executor.setCorePoolSize(10);
        // Maximum number of threads: the maximum number of threads in the thread pool, only after the buffer queue is full will the thread that exceeds the number of core threads be applied
        executor.setMaxPoolSize(20);
        // Buffer queue: the queue used to buffer execution tasks
        executor.setQueueCapacity(200);
        // Allow threads to be idle for 60 seconds: threads beyond the core thread will be destroyed after the idle time arrives
        executor.setKeepAliveSeconds(60);
        // The prefix of the thread pool name: After setting, it is convenient for us to locate the thread pool where the processing task is located
        executor.setThreadNamePrefix("AsyncTaskThreadPool-");
        // Rejection strategy after the buffer queue is full: handled by the calling thread (usually the main thread)
        //AbortPolicy: discard the task and throw a RejectedExecutionException
        //DiscardPolicy: Discard tasks, but do not throw exceptions. Abnormal state that may lead to failure to discover the system
        //DiscardOldestPolicy: discard the task at the front of the queue, and then resubmit the rejected task
        //CallerRunsPolicy: Do not discard the task, the task will be processed by the calling thread
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // The corePoolSize thread idle time in the thread pool reaches keepAliveTime and will also be closed
        executor.setAllowCoreThreadTimeOut(true);
        executor. initialize();
        return executor;
    }

}

@Async is executed according to adding to the custom thread pool

@Component
public class AsyncRequestComponent {<!-- -->

    @Async("AsyncTaskThreadPool")
    public CompletableFuture<String> mockHttpRequest(String url) throws InterruptedException, IOException {<!-- -->
        String threadName = Thread. currentThread(). getName();
        System.out.println("thread" + threadName + "start call, output:" + url);
        String result = "";
        // HttpClient RestTemplate Feign or other Http request framework
        // or other logical code that requires asynchronous concurrency
        // Thread. sleep(50);
        result = HttpClientUtil. get(url);

        System.out.println("thread" + threadName + " end of call, output: " + url);
        return CompletableFuture. completedFuture(result);
    }
}

@Slf4j
@SpringBootTest
class DemoTestApplicationTests {<!-- -->
    @Autowired
    private AsyncRequestComponent asyncRequestComponent;

     @Test
    void testAsyncThreadPerformance() throws InterruptedException, ExecutionException {<!-- -->

        long start = System. currentTimeMillis();
        List<CompletableFuture<String>> list = new ArrayList<>();
        // By modifying the number of cycles to view the number of tasks < cache queue + core thread and number of tasks > cache queue + core thread main thread time spent
        // If the number of tasks < cache queue + core thread, the main thread does not need to wait
        // If the number of tasks > cache queue + core thread, the main thread needs to wait because the subsequent tasks have not been created successfully
        for(int i=0 ;i < 2000; i ++ ){<!-- -->
            CompletableFuture<String> future= asyncRequestComponent.mockHttpRequest(i);
            list. add(future);
        }

        // For task type 1, the main thread does not need to wait for all sub-threads to finish executing, so annotate the following for loop
        // For task type 2, the main thread needs to wait for all sub-threads to finish executing: it needs to traverse the future and execute the get method to wait for the sub-threads to finish executing
        for(Future<?> future : list){<!-- -->
            while (true) {<!-- -->//CPU high-speed polling: Each future is polled concurrently, judging the completion status and then obtaining the result. This line is the essence of this implementation. That is, there are 10 futures polling at high speed, and after completing the acquisition result of one future, one polling is closed
                if (future.isDone() & amp; & amp; !future.isCancelled()) {<!-- --> //Get the successful completion status of the future, if you want to limit the timeout of each task, cancel this line The state judgment + future.get(1000*1, TimeUnit.MILLISECONDS) + catch timeout exception can be used.
                    String result = (String) future.get();//Get the result
                    System.out.println("Task i=" + result + "Get completed!" + new Date());
                    break;//The current future has obtained the result, jump out of while
                } else {<!-- -->
                    Thread.sleep(1);//Take a rest of 1 millisecond for each polling (CPU nanosecond level), to avoid CPU high-speed polling exhausting the CPU --- "Novices don't forget this
                }
            }
        }
        long end = System. currentTimeMillis();
        System.out.println("Main thread takes time:" + (end-start));
    }

}

5. Conclusion

  • If you want the main thread to never block, do not set the queue length, because the default length is set to Integer.MAX_VALUE; because the queue length is very long, no additional threads will be created for processing, so the core thread (CorePool) = the maximum thread (MaxPool)
  • If the business users themselves can tolerate a certain range of response delays, it is necessary to start from the actual situation and count the average execution of sub-thread tasks to determine the number of core threads, the maximum number of threads, and the capacity of the buffer queue

Digression: How to create a thread pool

CPU-intensive: For CPU-intensive computing, multi-threading essentially improves the utilization of multi-core CPUs. Therefore, for an 8-core CPU, each core has one thread. In theory, creating 8 threads is enough Yes; theoretical formula: maximum number of threads = number of CPU cores + 1

IO-intensive tasks: The maximum number of threads for IO-intensive tasks is generally many times greater than the number of CPU cores, because the IO read and write speed is relatively slow compared to the CPU speed. If we set A small number of threads may lead to a waste of CPU resources. Theoretical formula: Maximum number of threads = Maximum number of threads = Number of CPU cores / (1 – IO_TIME/REQUEST_TIME) For example, in a request, the request duration is 10 seconds, and the IO call time is 8 seconds. At this time, our blocking percentage is 80%, the effective use of CPU is 20%, assuming an eight-core CPU, our number of threads is 8 / (1 – 80%) = 8 / 0.2 = 40

Maximum number of threads:

  • CPU intensive: number of CPU cores + 1
  • IO intensive: number of CPU cores / (1 – IO_TIME/REQUEST_TIME)

Number of core threads: Number of core threads = maximum number of threads * 20%

Get the number of CPU cores command:

window – Java : Runtime.getRuntime().availableProcessors()//Get the number of logical cores, such as 6 cores and 12 threads, then return 12

linux – Physical CPU: cat /proc/cpuinfo| grep “physical id”| sort| uniq| wc -l

linux – core in physical CPU: cat /proc/cpuinfo | grep “cpu cores” | uniq

linux – Logical CPU: cat /proc/cpuinfo | grep “processor” | wc -l

How to be lazy:

  • Number of core threads: number of CPU cores
  • Maximum number of threads: CPU logical CPU (number of CPU cores * 2)

References:

https://blog.csdn.net/weixin_49258262/article/details/125463819?spm=1001.2014.3001.5506
https://blog.csdn.net/Wei_Naijia/article/details/127028398?spm=1001.2014.3001.5506
https://blog.csdn.net/wozyb1995/article/details/125044992?spm=1001.2014.3001.5506
https://blog.csdn.net/iampatrick_star/article/details/124490586?spm=1001.2014.3001.5506
https://www.cnblogs.com/651434092qq/p/14240406.html
https://juejin.cn/post/6948034657321484318
https://juejin.cn/post/7072281409053786120