ScheduledThreadPool principle code

ScheduledThreadPool is a thread pool used to perform scheduled tasks. It is implemented based on ThreadPoolExecutor and has some special scheduling functions. The following is the code for its implementation principle:

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {<!-- -->

    // Scheduled task queue
    private final DelayedWorkQueue delayedQueue = new DelayedWorkQueue();

    // Scheduled task thread
    private ScheduledFutureTask scheduledTask;

    // Create ScheduledThreadPoolExecutor
    public ScheduledThreadPoolExecutor(int corePoolSize) {<!-- -->
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new SynchronousQueue<Runnable>());
    }

    // Implement ScheduledExecutorService.schedule
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {<!-- -->
        // Encapsulate the task into ScheduledFutureTask
        ScheduledFutureTask<?> scheduledTask = new ScheduledFutureTask<>(command, null,
                System.nanoTime() + unit.toNanos(delay));
        //Add to scheduled task queue
        delayedQueue.add(scheduledTask);
        // If the task is the earliest one, rebuild the scheduled task thread
        if (scheduledTask == delayedQueue.peek())
            scheduleFromNow();
        return scheduledTask;
    }

    // Implement ScheduledExecutorService.scheduleAtFixedRate
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {<!-- -->
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        // Encapsulate the task into ScheduledFutureTask
        ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command,
                null, System.nanoTime() + unit.toNanos(initialDelay)) {<!-- -->
            public void run() {<!-- -->
                // run task
                super.run();
                //Add the task to the scheduled task queue
                if (!isCancelled())
                    setNextRunTime(getNextRunTime() + unit.toNanos(period));

                if (isShutdown() & amp; & amp; remove(this))
                    reject(this);
            }
        };
        //Add to scheduled task queue
        delayedQueue.add(scheduledTask);
        // If the task is the earliest one, rebuild the scheduled task thread
        if (scheduledTask == delayedQueue.peek())
            scheduleFromNow();
        return scheduledTask;
    }

    // Implement ScheduledExecutorService.scheduleWithFixedDelay
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {<!-- -->
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        // Encapsulate the task into ScheduledFutureTask
        ScheduledFutureTask<Void> scheduledTask = new ScheduledFutureTask<Void>(command,
                null, System.nanoTime() + unit.toNanos(initialDelay)) {<!-- -->
            public void run() {<!-- -->
                // run task
                super.run();
                //Add the task to the scheduled task queue
                if (!isCancelled())
                    setNextRunTime(System.nanoTime() + unit.toNanos(delay));

                if (isShutdown() & amp; & amp; remove(this))
                    reject(this);
            }
        };
        //Add to scheduled task queue
        delayedQueue.add(scheduledTask);
        // If the task is the earliest one, rebuild the scheduled task thread
        if (scheduledTask == delayedQueue.peek())
            scheduleFromNow();
        return scheduledTask;
    }

    //Rebuild the scheduled task thread
    private void scheduleFromNow() {<!-- -->
        for (;;) {<!-- -->
            ScheduledFutureTask task = delayedQueue.peek();
            if (task == null)
                break;
            // Calculate the next execution time of the task
            long time = task.getNextRunTime() - System.nanoTime();
            if (time <= 0) {<!-- -->
                // The next execution time has passed, execute the task
                executeTask(task);
            } else {<!-- -->
                // The next execution time has not yet arrived, create a new scheduled task thread
                if (scheduledTask == null || scheduledTask.getNextRunTime() > task.getNextRunTime()) {<!-- -->
                    if (scheduledTask != null)
                        scheduledTask.cancel(false);
                    scheduledTask = new ScheduledFutureTask<Void>(this, null,
                            System.nanoTime() + time) {<!-- -->
                        public void run() {<!-- -->
                            scheduleFromNow();
                        }
                    };
                    //Execute scheduled task thread
                    super.execute(scheduledTask);
                }
                break;
            }
        }
    }

    //Execute task
    private void executeTask(Runnable runnable) {<!-- -->
        if (isShutdown())
            reject();
        else
            super.execute(runnable);
    }

    // Stop the scheduled task thread
    public void shutdown() {<!-- -->
        scheduledTask.cancel(false);
        super.shutdown();
    }

    // Scheduled task packaging class
    static class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {<!-- -->

        //Next execution time
        private long nextRunTime;

        // Constructor
        public ScheduledFutureTask(Runnable runnable, V result, long nextRunTime) {<!-- -->
            super(runnable, result);
            this.nextRunTime = nextRunTime;
        }

        // Get the next execution time
        public long getNextRunTime() {<!-- -->
            return nextRunTime;
        }

        //Set the next execution time
        public void setNextRunTime(long nextRunTime) {<!-- -->
            this.nextRunTime = nextRunTime;
        }

        //Compare next execution time
        public int compareTo(Delayed delayed) {<!-- -->
            if (delayed == this)
                return 0;
            if (delayed instanceof ScheduledFutureTask) {<!-- -->
                ScheduledFutureTask<?> that = (ScheduledFutureTask<?>)delayed;
                long diff = nextRunTime - that.nextRunTime;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (hashCode() <that.hashCode())
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    }

    // Scheduled task queue
    private class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {<!-- -->

        //task queue
        private final PriorityQueue<ScheduledFutureTask<?>> taskQueue = new PriorityQueue<>();

        // Implement AbstractQueue.iterator
        public Iterator<Runnable> iterator() {<!-- -->
            return new Iterator<Runnable>() {<!-- -->
                final Iterator<ScheduledFutureTask<?>> i = taskQueue.iterator();
                public boolean hasNext() {<!-- --> return i.hasNext(); }
                public Runnable next() {<!-- --> return i.next(); }
                public void remove() {<!-- --> i.remove(); }
            };
        }

        // Implement AbstractCollection.size
        public int size() {<!-- -->
            return taskQueue.size();
        }

        // Implement BlockingQueue.remainingCapacity
        public int remainingCapacity() {<!-- -->
            return Integer.MAX_VALUE;
        }

        // Implement BlockingQueue.offer
        public boolean offer(Runnable o) {<!-- -->
            if (!(o instanceof ScheduledFutureTask))
                return false;
            ScheduledFutureTask<?> task = (ScheduledFutureTask<?>)o;
            Boolean f = taskQueue.offer(task);
            if (f & amp; & amp; taskQueue.peek() == task)
                scheduleFromNow();
            return f;
        }

        // Implement BlockingQueue.take
        public ScheduledFutureTask<?> take() throws InterruptedException {<!-- -->
            return (ScheduledFutureTask<?>)super.take();
        }

        // Implement BlockingQueue.poll
        public ScheduledFutureTask<?> poll(long timeout, TimeUnit unit) throws InterruptedException {<!-- -->
            return (ScheduledFutureTask<?>)super.poll(timeout, unit);
        }

        // Implement BlockingQueue.put
        public void put(Runnable o) throws InterruptedException {<!-- -->
            offer(o);
        }

        // Implement BlockingQueue.poll
        public boolean offer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {<!-- -->
            return offer(o);
        }

        // Implement BlockingQueue.peek
        public ScheduledFutureTask<?> peek() {<!-- -->
            return (ScheduledFutureTask<?>)taskQueue.peek();
        }

        // Implement BlockingQueue.poll
        public ScheduledFutureTask<?> poll() {<!-- -->
            ScheduledFutureTask<?> task = taskQueue.poll();
            if (task != null & amp; & amp; taskQueue.peek() != null)
                scheduleFromNow();
            return task;
        }

        // Implement BlockingQueue.remove
        public boolean remove(Object o) {<!-- -->
            return taskQueue.remove(o);
        }

        // Implement AbstractQueue.clear
        public void clear() {<!-- -->
            taskQueue.clear();
        }

        // Implement AbstractQueue.isEmpty
        public boolean isEmpty() {<!-- -->
            return taskQueue.isEmpty();
        }
    }

}

ScheduledThreadPoolExecutor inherits the ThreadPoolExecutor class, so its thread pool part is the same as ThreadPoolExecutor. It uses DelayedWorkQueue as a task queue. DelayedWorkQueue inherits AbstractQueue and implements the BlockingQueue interface. It supports delay-based task scheduling and can handle periodically recurring tasks. ScheduledFutureTask is a wrapper class for scheduled tasks. It implements the ScheduledFuture interface. It encapsulates the Runnable task, the next execution time, and the comparison with other scheduled tasks.