[Multithreading] Timer and thread pool

Please add a picture description

?Personal homepage: bit me
?Current Column: Java EE Primer
? Word of the Day: The best time to plant a tree was ten years ago, followed by now.


Directory

  • 1. Timer
    • 1. What is a timer
    • 2. Timers in the standard library
    • 3. Realize the timer
  • 2. Thread pool
    • 1. The thread pool in the standard library
    • 2. Implement thread pool:

1. Timer

1. What is a timer

The timer is also an important component in software development. It is similar to an “alarm clock”. After a set time is reached, a specified code will be executed.

Timer is a very commonly used component in actual development, and it is also available in the standard library

For example, in network communication, if the other party does not return data within 500ms, disconnect and try to reconnect.
For example, in a Map, it is hoped that a certain key in it will expire (automatically delete) after 3s.
Scenarios like this require the use of timers.

2. Timers in the standard library

  • The standard library provides a Timer class. The core method of the Timer class is schedule.
  • schedule contains two parameters. The first parameter specifies the task code to be executed, and the second parameter specifies how long to execute after (in milliseconds).
public class Demo22 {<!-- -->
    public static void main(String[] args) throws InterruptedException {<!-- -->
        // A component in java.util
        Timer timer = new Timer();
        //schedule This method has the effect of "scheduling a task"
        //Not immediately, but after 3000 ms
        timer.schedule(new TimerTask() {<!-- -->
            @Override
            public void run() {<!-- -->
                System.out.println("This is a task to be executed:");
            }
        },3000);

        while (true){<!-- -->
            System.out.println("main");
            Thread. sleep(1000);
        }
    }
}

Notice:

  • The reason why the program does not end is: the implementation of the timer involves multi-threading behind it, and there are threads in the Timer, and the running of this thread prevents the process from exiting!
  • The difference between sleep and timer: use sleep to block the current thread, during sleep time, you can't do anything, you can only wait, but use timer, what the previous thread should do.

3. Implement timer

  • A priority blocking queue

Why take priority?

Because the tasks in the blocking queue have their own execution time (delay). The task executed first must be the one with the smallest delay. Using a queue with priority can efficiently find out the task with the smallest delay.< /mark>

  • Each element in the queue is a MyTask object.
  • There is a time attribute in MyTask, The first element of the team is the task to be executed
  • At the same time, there is a thread that keeps scanning the first element of the queue to see if the first element of the queue needs to be executed
  1. The core interface provided by the MyTimer class is schedule, which is used to register a task and specify how long the task will be executed.
class MyTimer {<!-- -->
    public void schedule(Runnable command, long after) {<!-- -->
 //TODO
    }
}
  1. The MyTask class is used to describe a task (as the inner class of MyTimer). It contains a Runnable object and a time (millisecond timestamp) (this object needs to be placed in the priority queue. Therefore, it is required implement Comparable interface)
class MyTask implements Comparable<MyTask> {<!-- -->
    private Runnable command;
    private long time;
    public MyTask(Runnable command, long after){<!-- -->
        this.command = command;
        this.time = System.currentTimeMillis() + after;
    }
    
    public void run(){<!-- -->
        command. run();
    }

    public long getTime(){<!-- -->
        return time;
    }

    public int compareTo(MyTask o){<!-- -->
        return (int) (this.time - o.time);
    }
}
  • System.currentTimeMillis() + after; is an absolute timestamp, not "how long after it can be executed"
  • Rewrite the compareTo method to force type conversion
  1. In the instance of MyTimer, several MyTask objects are organized through PriorityBlockingQueue.
    Insert MyTask objects one by one into the queue through schedule.
class MyTimer {<!-- -->
private Object locker = new Object();
    // core structure
    private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue();
    
    public void schedule(Runnable command, long after) {<!-- -->
        MyTask myTask = new MyTask(command,after);
        queue. put(task);
        \t
synchronized (locker) {<!-- -->
            queue. put(myTask);
            locker. notify();
        }
   }
}
  • Use a priority queue to save several tasks. This queue will be accessed by multiple threads at the same time
  1. schedule may be called in multi-threads, each call needs to add elements to the queue
  2. It also needs a dedicated thread to execute the tasks in the queue
  1. There is a thread in the MyTimer class, which keeps scanning the first element of the queue to see if the task can be executed. (“Can be executed” means that the time set for the task has arrived up)
class Timer {<!-- -->
 // ... previous code unchanged
public MyTimer(){<!-- -->
        // start a thread here
        Thread t = new Thread(()->{<!-- -->
           while (true){<!-- -->
               //During the loop, keep trying to get the first element from the queue
               //Determine whether the current time of the first element of the team is ready, if it is ready, it will be executed, if it is not ready, it will not be executed
               try {<!-- -->
                   synchronized (locker) {<!-- -->
                       MyTask myTask = queue. take();
                       long curTime = System. currentTimeMillis();
                       if (myTask.getTime() > curTime) {<!-- -->
                           //The time is not up yet, Sai returns to the queue
                           queue. put(myTask);
                           locker.wait(myTask.getTime() - curTime);
                       } else {<!-- -->
                           //The time is up, execute the task directly
                           myTask. run();
                       }
                   }
               } catch (InterruptedException e) {<!-- -->
                   e.printStackTrace();
               }
           }
        });
        t. start();
    }
}
  • MyTask myTask = queue.take(); If the queue is empty, block here; if the queue is not empty: 1. Take out the task -> 2 . Compare time -> 3. If the time is not up, insert the task back into the queue; then continue to perform the above operations.
  • A large number of loops are generated in a short period of time, the CPU is idling, and there is no substantive execution task, which is equivalent to "busy waiting", so the thread needs to sleep until the task is executed here. If sleep is used here, it is It doesn't work, sleep cannot be interrupted when it is sleeping, and wait can also sleep the thread and use notify to wake it up.
  • To prevent the newly inserted elements from appearing between put and wait, it is earlier than the earliest task, so you need to lock the whole with a lock once the execution is completed, which can effectively avoid problems .

Attach the general implementation code

import java.sql.Time;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;

//Describe a task through this class
class MyTask implements Comparable<MyTask> {<!-- -->
    //what to do
    private Runnable command;
    //When will the task be done?
    private long time;

    public MyTask(Runnable command, long after){<!-- -->
        this.command = command;
        //The time recorded here is an absolute timestamp, not "how long after it can be executed"
        this.time = System.currentTimeMillis() + after;
    }

    //The method of executing the task can directly call Runnable's run internally
    public void run(){<!-- -->
        command. run();
    }

    public long getTime(){<!-- -->
        return time;
    }

    public int compareTo(MyTask o){<!-- -->
        //I hope that the time is small in the front, and the time is long in the back
        //Whoever subtracts can achieve the time, the smaller one is in the front, no need to memorize it deliberately
        return (int) (this.time - o.time);
    }
}

//Timer class created by yourself
class MyTimer{<!-- -->
    //This is the lock object used to block waiting
    private Object locker = new Object();

    //Use a priority queue to save several tasks
    private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();

    //What is the task to be performed by command
    //after how long to execute this task
    public void schedule(Runnable command, long after){<!-- -->
        MyTask myTask = new MyTask(command,after);
        queue. put(myTask);

        synchronized (locker) {<!-- -->
            queue. put(myTask);
            locker. notify();
        }
    }

    public MyTimer(){<!-- -->
        // start a thread here
        Thread t = new Thread(()->{<!-- -->
           while (true){<!-- -->
               //During the loop, keep trying to get the first element from the queue
               //Determine whether the current time of the first element of the team is ready, if it is ready, it will be executed, if it is not ready, it will not be executed
               try {<!-- -->
                   synchronized (locker) {<!-- -->
                       MyTask myTask = queue. take();
                       long curTime = System. currentTimeMillis();
                       if (myTask.getTime() > curTime) {<!-- -->
                           //The time is not up yet, Sai returns to the queue
                           queue. put(myTask);
                           locker.wait(myTask.getTime() - curTime);
                       } else {<!-- -->
                           //The time is up, execute the task directly
                           myTask. run();
                       }
                   }
               } catch (InterruptedException e) {<!-- -->
                   e.printStackTrace();
               }
           }
        });
        t. start();
    }
}


public class Demo23 {<!-- -->
    public static void main(String[] args) {<!-- -->
        MyTimer myTimer = new MyTimer();

        myTimer.schedule(new Runnable() {<!-- -->
            @Override
            public void run() {<!-- -->
                System.out.println("3333");
            }
        },6000);
        myTimer.schedule(new Runnable() {<!-- -->
            @Override
            public void run() {<!-- -->
                System.out.println("2222");
            }
        },4000);
        myTimer.schedule(new Runnable() {<!-- -->
            @Override
            public void run() {<!-- -->
                System.out.println("1111");
            }
        },2000);
    }
}

? Two. Thread pool

Because the process is too heavy, the cost of creation and destruction is relatively high (it needs to apply for the release of resources). Threads are optimized for the above problems (share the same set of system resources). Even so, in the case of frequent creation and release Under the circumstances, the thread can’t handle it, so further optimizations appear.

  • thread pool
  • coroutine (fiber), lightweight thread

The idea of thread pool to solve the problem is to put the thread in the pool after it is created. If you need to use the thread, you can directly get it from the pool instead of creating it through the system. When the thread When it is used up, it will be returned to the pool instead of being destroyed through the system.

Why is it faster to put threads in the pool than to create threads from the system?

Take it from the pool, pure user mode operation. Created by the system, involving kernel mode operations.

1. Thread pool in the standard library

ExecutorService threadPool = Executors. newFixedThreadPool(10);
  • ExecutorService executor, execute Service service
  • Fixed means “fixed”
  • Executors is a class, and newFixedThreadPool is its static method. With the help of static methods, instances are created. Such methods are called “factory methods”, and the corresponding design patterns are called ” Factory mode” (because the limitation of the construction method is that the name and the class name are required to be the same, and the overloading requires the parameter type and number to be different, so at this time only the factory mode can be used to realize the class name is not the same, the parameter type and number exactly the same)
  • Use Executors.newFixedThreadPool(10) to create a thread pool with a fixed number of 10 threads.
  • The return value type is ExecutorService
  • A task can be registered to the thread pool through ExecutorService.submit.
public class Demo24 {<!-- -->
    public static void main(String[] args) {<!-- -->
        ExecutorService threadPool = Executors.newFixedThreadPool(10);//The number of threads in the fixed thread pool
        //Executors.newCachedThreadPool();//Gradually increasing the number of threads in the thread pool

        //Load the task to the thread pool and use the threads in the thread pool to complete the task
        for (int i = 0; i < 100; i ++ ) {<!-- -->
            threadPool. submit(new Runnable() {<!-- -->
                @Override
                public void run() {<!-- -->
                    System.out.println("hello");
                }
            });
        }

    }
}

Several ways for Executors to create thread pools:

  • newFixedThreadPool: Create a thread pool with a fixed number of threads
  • newCachedThreadPool: Create a thread pool with a dynamically growing number of threads.
  • newSingleThreadExecutor: Create a thread pool that only contains a single thread.
  • newScheduledThreadPool: Execute commands after setting a delay time, or execute commands periodically. It is an advanced version of Timer.

Executors are essentially ThreadPoolExecutor wrappers.

ThreadPoolExecutor provides more optional parameters, which can further refine the setting of thread pool behavior. (Detailed explanation later)

2. Implement thread pool:

  • The core operation is submit, adding the task to the thread pool
  • Use a class to describe a worker thread. Use a Runnable to describe a task.
  • Use a BlockingQueue to organize all tasks
  • What each thread has to do: continuously fetch tasks from BlockingQueue and execute them.
class MyThreadPool {<!-- -->
    //This queue is "task queue" Put all the tasks to be completed by the current thread pool into this queue
    //Then the worker thread inside the thread pool is responsible for completing them
    private BlockingDeque<Runnable> queue = new LinkedBlockingDeque<>();

    //Core method: Insert tasks into the thread pool
    public void submit(Runnable runnable){<!-- -->
        try {<!-- -->
            queue. put(runnable);
        } catch (InterruptedException e) {<!-- -->
            e.printStackTrace();
        }
    }

    //Set how many threads are in the thread pool
    public MyThreadPool(int n){<!-- -->
        //In the construction method, you need to create some threads, and let these threads be responsible for completing the above execution tasks
        for (int i = 0; i < n; i ++ ) {<!-- -->
            Thread t = new Thread(()->{<!-- -->
               while (!Thread.currentThread().isInterrupted()){<!-- -->//1. Judgment flag class 2. Judging whether the thread is interrupted, if it is interrupted, it will not continue
                   try {<!-- -->
                       Runnable runnable = queue. take();
                       runnable. run();
                   } catch (InterruptedException e) {<!-- -->
                       e.printStackTrace();
                   }
               }
            });
            t. start();
        }
    }
}

public class Demo25 {<!-- -->
    public static void main(String[] args) {<!-- -->
        MyThreadPool myThreadPool = new MyThreadPool(10);
        for (int i = 0; i < 100; i ++ ) {<!-- -->
            myThreadPool. submit(new Runnable() {<!-- -->
                @Override
                public void run() {<!-- -->
                    System.out.println("hello");
                }
            });
        }
    }
}

!Thread.currentThread().isInterrupted() : 1. Judging the flag class 2. Judging whether the thread is interrupted, if it is interrupted, it will not continue