ScheduledThreadPool is a thread pool used to perform scheduled tasks. It is implemented based on ThreadPoolExecutor and has some special scheduling functions. The following is the code for its implementation principle:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {<!-- --> // Scheduled task queue private final DelayedWorkQueue delayedQueue = new DelayedWorkQueue(); // Scheduled task thread private ScheduledFutureTask scheduledTask; // Create ScheduledThreadPoolExecutor public ScheduledThreadPoolExecutor(int corePoolSize) {<!-- --> super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new SynchronousQueue<Runnable>()); } // Implement ScheduledExecutorService.schedule public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {<!-- --> // Encapsulate the task into ScheduledFutureTask ScheduledFutureTask<?> scheduledTask = new ScheduledFutureTask<>(command, null, System.nanoTime() + unit.toNanos(delay)); //Add to scheduled task queue delayedQueue.add(scheduledTask); // If the task is the earliest one, rebuild the scheduled task thread if (scheduledTask == delayedQueue.peek()) scheduleFromNow(); return scheduledTask; } // Implement ScheduledExecutorService.scheduleAtFixedRate public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {<!-- --> if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // Encapsulate the task into ScheduledFutureTask ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command, null, System.nanoTime() + unit.toNanos(initialDelay)) {<!-- --> public void run() {<!-- --> // run task super.run(); //Add the task to the scheduled task queue if (!isCancelled()) setNextRunTime(getNextRunTime() + unit.toNanos(period)); if (isShutdown() & amp; & amp; remove(this)) reject(this); } }; //Add to scheduled task queue delayedQueue.add(scheduledTask); // If the task is the earliest one, rebuild the scheduled task thread if (scheduledTask == delayedQueue.peek()) scheduleFromNow(); return scheduledTask; } // Implement ScheduledExecutorService.scheduleWithFixedDelay public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {<!-- --> if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // Encapsulate the task into ScheduledFutureTask ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command, null, System.nanoTime() + unit.toNanos(initialDelay)) {<!-- --> public void run() {<!-- --> // run task super.run(); //Add the task to the scheduled task queue if (!isCancelled()) setNextRunTime(System.nanoTime() + unit.toNanos(delay)); if (isShutdown() & amp; & amp; remove(this)) reject(this); } }; //Add to scheduled task queue delayedQueue.add(scheduledTask); // If the task is the earliest one, rebuild the scheduled task thread if (scheduledTask == delayedQueue.peek()) scheduleFromNow(); return scheduledTask; } //Rebuild the scheduled task thread private void scheduleFromNow() {<!-- --> for (;;) {<!-- --> ScheduledFutureTask task = delayedQueue.peek(); if (task == null) break; // Calculate the next execution time of the task long time = task.getNextRunTime() - System.nanoTime(); if (time <= 0) {<!-- --> // The next execution time has passed, execute the task executeTask(task); } else {<!-- --> // The next execution time has not yet arrived, create a new scheduled task thread if (scheduledTask == null || scheduledTask.getNextRunTime() > task.getNextRunTime()) {<!-- --> if (scheduledTask != null) scheduledTask.cancel(false); scheduledTask = new ScheduledFutureTask<Void>(this, null, System.nanoTime() + time) {<!-- --> public void run() {<!-- --> scheduleFromNow(); } }; //Execute scheduled task thread super.execute(scheduledTask); } break; } } } //Execute task private void executeTask(Runnable runnable) {<!-- --> if (isShutdown()) reject(); else super.execute(runnable); } // Stop the scheduled task thread public void shutdown() {<!-- --> scheduledTask.cancel(false); super.shutdown(); } // Scheduled task packaging class static class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {<!-- --> //Next execution time private long nextRunTime; // Constructor public ScheduledFutureTask(Runnable runnable, V result, long nextRunTime) {<!-- --> super(runnable, result); this.nextRunTime = nextRunTime; } // Get the next execution time public long getNextRunTime() {<!-- --> return nextRunTime; } //Set the next execution time public void setNextRunTime(long nextRunTime) {<!-- --> this.nextRunTime = nextRunTime; } //Compare next execution time public int compareTo(Delayed delayed) {<!-- --> if (delayed == this) return 0; if (delayed instanceof ScheduledFutureTask) {<!-- --> ScheduledFutureTask<?> that = (ScheduledFutureTask<?>)delayed; long diff = nextRunTime - that.nextRunTime; if (diff < 0) return -1; else if (diff > 0) return 1; else if (hashCode() <that.hashCode()) return -1; else return 1; } long diff = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } } // Scheduled task queue private class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {<!-- --> //task queue private final PriorityQueue<ScheduledFutureTask<?>> taskQueue = new PriorityQueue<>(); // Implement AbstractQueue.iterator public Iterator<Runnable> iterator() {<!-- --> return new Iterator<Runnable>() {<!-- --> final Iterator<ScheduledFutureTask<?>> i = taskQueue.iterator(); public boolean hasNext() {<!-- --> return i.hasNext(); } public Runnable next() {<!-- --> return i.next(); } public void remove() {<!-- --> i.remove(); } }; } // Implement AbstractCollection.size public int size() {<!-- --> return taskQueue.size(); } // Implement BlockingQueue.remainingCapacity public int remainingCapacity() {<!-- --> return Integer.MAX_VALUE; } // Implement BlockingQueue.offer public boolean offer(Runnable o) {<!-- --> if (!(o instanceof ScheduledFutureTask)) return false; ScheduledFutureTask<?> task = (ScheduledFutureTask<?>)o; Boolean f = taskQueue.offer(task); if (f & amp; & amp; taskQueue.peek() == task) scheduleFromNow(); return f; } // Implement BlockingQueue.take public ScheduledFutureTask<?> take() throws InterruptedException {<!-- --> return (ScheduledFutureTask<?>)super.take(); } // Implement BlockingQueue.poll public ScheduledFutureTask<?> poll(long timeout, TimeUnit unit) throws InterruptedException {<!-- --> return (ScheduledFutureTask<?>)super.poll(timeout, unit); } // Implement BlockingQueue.put public void put(Runnable o) throws InterruptedException {<!-- --> offer(o); } // Implement BlockingQueue.poll public boolean offer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {<!-- --> return offer(o); } // Implement BlockingQueue.peek public ScheduledFutureTask<?> peek() {<!-- --> return (ScheduledFutureTask<?>)taskQueue.peek(); } // Implement BlockingQueue.poll public ScheduledFutureTask<?> poll() {<!-- --> ScheduledFutureTask<?> task = taskQueue.poll(); if (task != null & amp; & amp; taskQueue.peek() != null) scheduleFromNow(); return task; } // Implement BlockingQueue.remove public boolean remove(Object o) {<!-- --> return taskQueue.remove(o); } // Implement AbstractQueue.clear public void clear() {<!-- --> taskQueue.clear(); } // Implement AbstractQueue.isEmpty public boolean isEmpty() {<!-- --> return taskQueue.isEmpty(); } } }
ScheduledThreadPoolExecutor inherits the ThreadPoolExecutor class, so its thread pool part is the same as ThreadPoolExecutor. It uses DelayedWorkQueue as a task queue. DelayedWorkQueue inherits AbstractQueue and implements the BlockingQueue interface. It supports delay-based task scheduling and can handle periodically recurring tasks. ScheduledFutureTask is a wrapper class for scheduled tasks. It implements the ScheduledFuture interface. It encapsulates the Runnable task, the next execution time, and the comparison with other scheduled tasks.