java multithreading, thread pool

java multithreading, thread pool

java

Java thread pool usage and common parameters

Multithreading problem:

1. Why use multithreading in java
Using multi-threading, some large tasks can be decomposed into multiple small tasks for execution, and the multiple small tasks are not imaged by each other, and they are carried out at the same time. In this way, CPU resources are fully utilized.

2. A simple way to implement multithreading in java

  • Inherit the Thread class and rewrite the run method;
class MyTread extends Thread{<!-- -->
    public void run() {<!-- -->
    System.out.println(Thread.currentThread().getName());
    }
}
  • Implement the Runable interface and implement the run method;
class MyRunnable implements Runnable{<!-- -->
 
public void run() {<!-- -->
 
System.out.println(Thread.currentThread().getName());
}
}
class ThreadTest {<!-- -->
public static void main(String[] args) {<!-- -->
 
MyTread thread = new Mythread();
thread.start(); //Start a thread
 
MyRunnable myRunnable = new MyRunnable();
Thread runnable = new Thread(myRunnable);
runnable.start(); //Start a thread
 
}
}

3. The state of the java thread

Create: When a new thread is created and before start is called, the thread is in the created state;
Ready: When start is called, the thread is in the ready state, that is, the thread scheduler has not set the execution of the current thread;
Run: When the thread scheduler executes to the thread, the current thread changes from the ready state to the running state, and starts to execute the code in the run method;
Blocking: When a thread is running, execution is suspended (usually after waiting for a certain resource to be ready, sleep and wait can cause the thread to block), which means that the thread is in a blocked state;
Death: When a thread finishes executing the code in the run method or calls the stop method, the thread ends running

4. Why introduce thread pool

When we need a large number of concurrent execution threads, and the execution of each thread ends in a short time, our frequent creation and destruction of threads greatly reduces work efficiency (creating and destroying threads takes time and resources).
The thread pool in java can achieve such an effect: after a thread executes a task, it continues to execute the next task without being destroyed, so that the thread utilization rate is improved.

5. Thread pool in java (ThreadPoolExecutor)

Speaking of the thread pool in java, I think of java.util.concurrent.ThreadPoolExecutor. The ThreadPoolExecutor class is the core class in the java thread pool. There are four ways to implement it:

public class ThreadPoolExecutor extends AbstractExecutorService {<!-- -->
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {<!-- -->
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {<!-- -->
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {<!-- -->
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
 
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {<!-- -->
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this. corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this. threadFactory = threadFactory;
this.handler = handler;
}

It can be seen from the source code of the ThreadPoolExecutor class that the ThreadPoolExecutor class inherits AbstractExecutorService and provides four construction methods. From the construction methods, it can be seen that the first three finally dropped the last one.
The following describes the parameters in the construction method:
corePoolSize: The size of the thread pool. After the thread pool is created, the thread will not be created immediately, but will wait for the arrival of the thread. When the number of currently executing threads is greater than the value, the threads will be added to the buffer queue;
maximumPoolSize: the maximum number of threads created in the thread pool;
keepAliveTime: How long after an idle thread is destroyed. By default, when the number of threads is greater than corePoolSize, it will work on threads that exceed corePoolSize.
unit: The value of the TimeUnit enumeration type, which represents the keepAliveTime time unit, and can take the following values:
TimeUnit.DAYS; //days
TimeUnit.HOURS; //hour
TimeUnit.MINUTES; //minutes
TimeUnit.SECONDS; //Seconds
TimeUnit.MILLISECONDS; // milliseconds
TimeUnit.MICROSECONDS; //subtle
TimeUnit.NANOSECONDS; // nanoseconds
workQueue: Blocking queue, used to store tasks waiting to be executed, determines the queuing strategy of the thread pool, has the following values:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
threadFactory: Thread factory is used to create threads. default new Executors. DefaultThreadFactory();
handler: thread rejection policy. When the created thread exceeds the maximumPoolSize and the buffer queue is full, the new task will be rejected, with the following values:
ThreadPoolExecutor.AbortPolicy: Discard tasks and throw RejectedExecutionException.
ThreadPoolExecutor.DiscardPolicy: It also discards tasks, but does not throw an exception.
ThreadPoolExecutor.DiscardOldestPolicy: Discard the task at the front of the queue, and then try to execute the task again (repeat this process)
ThreadPoolExecutor.CallerRunsPolicy: The task is handled by the calling thread

  • The following is the specific implementation method:
  • default policy. When using this strategy, if the thread pool queue is full, the task is discarded and a RejectedExecutionException is thrown
class AbortPolicy implements RejectedExecutionHandler{<!-- -->
 
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- -->
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
executor.toString());
}
}
  • If the thread pool queue is full, the task will be discarded without any exception
class DiscardPolicy implements RejectedExecutionHandler{<!-- -->
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- -->
 
}
}
  • Discarding the oldest one will delete the earliest tasks that entered the queue to make room, and then try to join the queue
class DiscardOldestPolicy implements RejectedExecutionHandler{<!-- -->
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- -->
if (!executor.isShutdown()) {<!-- -->
//Remove the head element
executor.getQueue().poll();
//Try to join the team again
executor.execute(r);
}
}
}
  • The main thread will execute the task by itself, and will not wait for the threads in the thread pool to execute
class CallerRunsPolicy implements RejectedExecutionHandler{<!-- -->
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- -->
if (!executor.isShutdown()) {<!-- -->
//Execute the run method directly
r. run();
}
}
}
  • The following is the specific inheritance structure of ThreadPoolExecutor
public abstract class AbstractExecutorService implements ExecutorService {<!-- -->
 
}
  • This is an abstract class that implements the ExecutorService interface and the methods in ExecutorService. Let’s look at the specific implementation of the ExecutorService interface
public interface ExecutorService extends Executor {<!-- -->
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
  • ExecutorService inherits the Executor interface, the following is the specific implementation of the Executor interface
public interface Executor {<!-- -->
void execute(Runnable command);
}

 - ```Executor interface is the top-level interface, which only declares an execute method, which is used to execute the passed task.
   Looking back, let's look at the ThreadPoolExecutor class again. There are two important methods in the modified class:

```java
public void execute(Runnable command) {<!-- -->
if (command == null)
throw new NullPointerException();
int c = ctl. get();
if (workerCountOf(c) < corePoolSize) {<!-- -->
if (addWorker(command, true))
return;
c = ctl. get();
}
if (isRunning(c) & amp; & amp; workQueue.offer(command)) {<!-- -->
int recheck = ctl. get();
if (! isRunning(recheck) & amp; & amp; remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false))
reject(command);
}
public <T> Future<T> submit(Callable<T> task) {<!-- -->
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

The execute() method is a method declared in Executor, which has a specific implementation in ThreadPoolExecutor. This method is the core method of ThreadPoolExecutor.
Through this method, a task can be submitted to the thread pool for execution by the thread pool
The submit() method is a method declared in ExecutorService, implemented in AbstractExecutorService, and not rewritten in Executor. As can be seen from the implementation, the submit method finally calls execute
The method is also executed by one person, but the submit method can return the execution result, and use Future to obtain the task execution result.

6. Thread pool in Spring

The thread pool in Spring is implemented by the ThreadPoolTaskExecutor class. The realization principle of this class finally calls some methods in the ThreadPoolExecutor class in java. For specific implementation, readers can read Spring by themselves.
I will not list the source code here. Let’s look at the initialization of ThreadPoolTaskExecutor.
ThreadPoolTaskExecutor has two common initialization methods: xml configuration, java code initialization
xml configuration:

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="keepAliveSeconds" value="200" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="20" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>

After reading the above content, readers should be very clear about the meaning of some of the above parameters. The author will not explain them one by one here.

public MyThreadPoolTaskExecutor {<!-- -->
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
 
?private void test(){<!-- -->
taskExecutor.execute(new Runnable(){<!-- -->
@Override
public void run() {<!-- -->
//Executed code
}});
}
}

Java code initialization:

private void test2(){<!-- -->
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(15);
executor.setKeepAliveSeconds(1);
executor.setQueueCapacity(5);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
executor.execute(new Runnable(){<!-- -->
@Override
public void run() {<!-- -->
//Executed code
}
});
}

Summary of common parameters:

About the parameter setting of the Java thread pool:

The thread pool is an important part of the development of Java multithreading. It is not difficult to use, but how to use it well requires understanding the meaning of the parameters and how to set them. Most of the content in the dry goods is based on other people’s references, and some knowledge points and opinions have been added. I hope to be able to inspire and help children’s shoes for multi-threaded development and learning.

1. Important parameters of ThreadPoolExecutor
1. corePoolSize: the number of core threads

The core thread will always survive, even if there is no task to execute
When the number of threads is less than the number of core threads, even if there are idle threads, the thread pool will give priority to creating new threads for processing
When setting allowCoreThreadTimeout=true (default false), the core thread will timeout and close

2. queueCapacity: task queue capacity (blocking queue)

When the number of core threads reaches the maximum, new tasks will be queued in the queue for execution

3. maxPoolSize: the maximum number of threads

When the number of threads >= corePoolSize, and the task queue is full. The thread pool creates new threads to process tasks
When the number of threads = maxPoolSize, and the task queue is full, the thread pool will refuse to process the task and throw an exception

4. keepAliveTime: thread idle time

When the thread idle time reaches keepAliveTime, the thread will exit until the number of threads = corePoolSize
If allowCoreThreadTimeout=true, until the number of threads=0

5. allowCoreThreadTimeout: allow core threads to timeout

6. rejectedExecutionHandler: task rejection handler

Two cases will refuse to process the task:
When the number of threads has reached maxPoolSize and the cut queue is full, new tasks will be rejected
When the thread pool is called shutdown(), it will wait for the tasks in the thread pool to be executed before shutting down. If a task is submitted between calling shutdown() and the actual shutdown of the thread pool, new tasks will be rejected
The thread pool will call rejectedExecutionHandler to handle this task. If not set, the default is AbortPolicy, an exception will be thrown
The ThreadPoolExecutor class has several internal implementation classes to handle such cases:
AbortPolicy discards tasks and throws runtime exceptions
CallerRunsPolicy execution task
DiscardPolicy ignored, nothing happens
DiscardOldestPolicy kicks the first queued (last executed) task from the queue
Implement the RejectedExecutionHandler interface, which can customize the processor

2. ThreadPoolExecutor execution sequence
The thread pool performs tasks according to the following behavior
1. When the number of threads is less than the number of core threads, create threads.
2. When the number of threads is greater than or equal to the number of core threads and the task queue is not full, put the task into the task queue.
3. When the number of threads is greater than or equal to the number of core threads, and the task queue is full
– If the number of threads is less than the maximum number of threads, create a thread
– If the number of threads is equal to the maximum number of threads, throw an exception and reject the task
3. How to set parameters

1. Default value
* corePoolSize=1
* queueCapacity=Integer.MAX_VALUE
* maxPoolSize=Integer.MAX_VALUE
* keepAliveTime=60s
* allowCoreThreadTimeout=false
* rejectedExecutionHandler=AbortPolicy()

2. How to set
    * Need to decide based on several values
        - tasks: the number of tasks per second, assuming 500~1000
        - taskcost: time spent on each task, assuming 0.1s
        - responsetime: The maximum response time allowed by the system, assuming 1s
    * Do some calculations
        - corePoolSize = How many threads per second do you need?
            * threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 threads. The corePoolSize setting should be greater than 50
            * According to the 8020 principle, if 80% of the tasks per second are less than 800, then set corePoolSize to 80
        - queueCapacity = (coreSizePool/taskcost)*responsetime
            * Calculated to get queueCapacity = 80/0.1*1 = 800. It means that the threads in the queue can wait for 1s, and if it exceeds, a new thread needs to be opened to execute
            * Remember that it cannot be set to Integer.MAX_VALUE, so that the queue will be very large, and the number of threads will only remain at the size of corePoolSize. When tasks increase sharply, new threads cannot be opened for execution, and the response time will increase sharply.
        - maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
            * Calculate maxPoolSize = (1000-800)/10 = 20
            * (maximum number of tasks - queue capacity) / processing capacity per second per thread = maximum number of threads
        - rejectedExecutionHandler: It depends on the specific situation. If the task is not important, it can be discarded. If the task is important, some buffering mechanism should be used to process it.
        - keepAliveTime and allowCoreThreadTimeout are usually satisfied by default

3. The above values are all ideal values. In actual situations, it should be determined according to the performance of the machine. If the machine cpu load is already full when the maximum number of threads is not reached, you need to upgrade the hardware (hehe) and optimize the code to reduce the taskcost.

set up.