[java] Use of java thread pool ThreadPoolExecutor

When parsing data in Kafka, due to the large amount of data, multi-thread processing can be enabled in Java. The ThreadPoolExecutor used here can be used directly.

Table of Contents

1. What is the role of the thread pool?

2. Benefits of using thread pool

3. Thread pool implementation principle

1. Thread pool processing process

?edit

2. Schematic diagram of ThreadPoolExecutor executing the execute() method

2. Usage steps

1. Import the library

2. Create a thread pool

3. Introduction to ThreadPoolExecutor function

4. Submit tasks to the thread pool

5. Close the thread pool

6. Properly configure the thread pool

Summarize

code

Preface

How to quickly use the thread pool for processing in Java? Directly use the third-party interface ThreadPoolExecutor

1. What is the function of the thread pool?

The main job of the thread pool is to control the number of running threads, put tasks into the queue during processing, and then start these tasks after the threads are created. If the number of threads exceeds If the maximum number is reached, then threads that exceed the number will queue up and wait for other threads to finish executing before taking the tasks out of the queue for execution.

2. Benefits of using thread pool

  1. Reduce resource consumption. Reduce the consumption caused by thread creation and destruction by reusing created threads;
  2. Improve responsiveness. When a task arrives, the task can be executed immediately without waiting for the thread to be created;
  3. Improve thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce the stability of the system. Thread pools can be used for unified allocation, tuning and monitoring.

3. Thread pool implementation principle

1. Thread pool processing process

2. Diagram of ThreadPoolExecutor executing the execute() method

2. Usage steps

1. Import library

code show as below:

import java.util.concurrent.ThreadPoolExecutor

2. Create thread pool

The code is as follows: Created through ThreadPoolExecutor

 // Define thread pool
     private static final ThreadPoolExecutor EXECUTOR1 = new ThreadPoolExecutor(2500,
            4500,
            100L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(7000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
     //Reference thread pool
     for (ConsumerRecord<String, String> record : records) {
            EXECUTOR.execute(new Runnable() {
                @Override
                public void run() {
                  //The specific execution logic of the program
                            
                }
            });
     }

3.ThreadPoolExecutor function introduction

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
  1. corePoolSize: The number of core threads in the thread pool, which defines the minimum number of threads that can run simultaneously.
  2. maximumPoolSize: The maximum number of threads in the thread pool. When the tasks stored in the square queue reach the queue capacity, the number of threads that can run simultaneously becomes the maximum number of threads.
  3. keepAliveTime: When the number of threads in the thread pool is greater than corePoolSize, if no new tasks are submitted, the threads outside the core thread will not be destroyed immediately, but will wait until the waiting time exceeds KeepAliveTime before they are recycled and destroyed.
  4. unit: The time unit of the keepAliveTime parameter, including DAYS, HOURS, MINUTES, MILLISECONDS, etc.
  5. workQueue: A blocking queue used to save tasks waiting to be executed. You can choose from the following blocking queues:

    ArrayBlockingQueue Array BlockingQueue: It is a blocking queue based on an array structure. This queue sorts elements according to the FIFO principle;

    LinkedBlockingQueue: It is a blocking queue based on a linked list structure. This queue sorts elements by FIFO, and its throughput is usually higher than ArrayBlockingQueue. The static factory method Executors.newFixedThreadPool() uses this queue.

    SynchronousQueue Synchronous Queue: A blocking queue that does not store elements. Each insertion operation must wait until another thread calls the removal operation, otherwise the insertion operation will always be blocked, and the throughput is often higher than LinkedBlockingQueue. The static factory method Executors.newCachedThreadPool() uses this queue.

    PriorityBlockingQueue: An infinite blocking queue with priority.

  6. threadFactory: used to set the factory for creating threads. You can use the factory to set a more meaningful name for each created thread. Use the ThreadFactoryBuilder provided by the open source framework guava to quickly set meaningful names for threads in the thread pool:
    1. new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
    2. new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
  7. handler: saturation strategy. If the number of threads currently running simultaneously reaches the maximum number of threads and the queue is full, ThreadPoolExecutor defines some saturation strategies:

ThreadPoolExecutor.AbortPolicy: Directly throw a RejectedExecutionException exception to refuse to process new tasks;

ThreadPoolExecutor.CallerRunsPolicy: Only using the caller’s thread to run tasks will reduce the submission speed of new tasks and affect the overall performance of the program.

ThreadPoolExecutor.DiscardPolicy: Do not process new tasks and discard them directly. ThreadPoolExecutor.DiscardOldestPolicy: Discard the latest task in the queue and execute the current task.

4. Submit tasks to the thread pool

  • The execute() method is used to submit tasks to the thread pool that do not require a return value, so it is impossible to determine whether the task is successfully executed by the thread pool.
executor.execute(new Runnable() {
    @Override
    public void run() {
       // TODO
    }
});
  • The submit() method is used to submit tasks that require return values. The thread pool will return a future type object. Through this future object, you can determine whether the task is successfully executed, and you can obtain the return value through the future’s get() method. The get() method will block the current thread until the task is completed. Using the get(long timeout, TimeUnit unit) method will block the current thread for a period of time and then return immediately. At this time, the task may not be completed yet.
Future<T> future = EXECUTOR.submit(hasReturnValueTask);
try {
    T s = future.get();
} catch (Exception e) {
    // Handle exceptions e.printStackTrace();
} finally {
    // Shut down the thread pool executor.shutdown();
}

5. Close the thread pool

Use the thread pool’s shutdown or shutdownNow method to shut down the thread pool. The principle is to traverse the worker threads in the thread pool, and then call the interrupt method of the thread one by one to interrupt the thread, so tasks that cannot respond to interruption may not be terminated.

The difference between the two is that the shutdownNow method first sets the thread pool status to STOP, then tries to stop all threads that are executing or suspending tasks, and returns a list of tasks waiting to be executed, while shutdown just sets the thread pool status to SHUTDOWN status, and then Interrupt all threads that are not currently executing tasks.

6. Properly configure the thread pool

Check the number of CPU cores of the current device:

Runtime.getRuntime().availableProcessors();
  • CPU-intensive tasks: Tasks require a large amount of calculations without blocking, and the CPU is always running at full speed. Configure CPU-intensive tasks with as few threads as possible. Formula: CPU core number + 1 thread pool.
  • IO-intensive tasks: The tasks require a lot of IO, that is, a lot of blocking. Since IO-intensive task threads are not always executing tasks, you can allocate more threads, such as Number of CPU cores*2. Formula: Number of CPU cores/(1-blocking coefficient), where the blocking coefficient is between 0.8-0.9.

Summary

In practical applications, for example, I process a variety of data. Since the complexity of each type of data is different, it sometimes causes resource waiting problems. However, if the difficulty of processing the data is about the same, the effect is still the same. Very significant.

code

package cn.ac.ii;

import cn.ac.ii.kafka.Kafka;
import cn.ac.ii.tool.ConstUtil;
import cn.ac.ii.tool.MyTools;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @author drifting all night
 * @create 2022-05-26 10:07
 */
@Slf4j
public class Metrics_App {
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(2500, 4500, 100L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(7000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    public void running() {
        // 1. Connect to kafka
        KafkaConsumer<String, String> kafkaConsumer =
                Kafka.getKafkaConsumer(ConstUtil.KAFKA_QUORUM, "group10",true);
        // true is latest
        kafkaConsumer.subscribe(Arrays.asList(ConstUtil.METRICS_TOPIC_NAME));
        Map<String, JSONObject> extracted = null;
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
            if (!records.isEmpty()) {
                // 2. Filter out the data in plugins_plugin that are not empty.
                for (ConsumerRecord<String, String> record : records) {
                    EXECUTOR.execute(new Runnable() {
                        @Override
                        public void run() {
                            //The actual processing logic part
                        }
                    });
                }
            }
        }
    }

    public static void main(String[] args) {
        Metrics_App rk = new Metrics_App();
        rk.running(); // normal business
    }
}