[Java] Timing tasks – Timer/TimerTask source code principle analysis

1. Background and usage

When implementing various server-side systems on a daily basis, we will definitely have the need for some scheduled tasks. For example, automatic reminders for meetings half an hour in advance, scheduled/periodic execution of asynchronous tasks, etc. So how to implement such a scheduled task system? The Timer class provided by Java JDK is a good tool. Through simple API calls, we can implement scheduled tasks.

Now let’s take a look at how java.util.Timer implements such timing functions.

First, let’s take a look at a demo using

Timer timer = new Timer();
TimerTask task = new TimerTask() {
    public void run() {
        System.out.println("executing now!");
    }
};

// Delay 1s and print once
timer.schedule(task, 1000)
//Delay 1s fixed delay print once every 1s period
timer.schedule(task, 1000, 1000);
// Delay 1s, print at a fixed rate every 1s period
timer.scheduleAtFixRate(task, 1000, 1000)

Basic usage:

  1. Create a Timer object

  2. Create a TimerTask object and implement the run method here

  3. Pass the TimerTask object as a parameter into the schedule method of the Timer object for scheduling execution.

The API for adding tasksis as follows:

  • API for one-time tasks

 // Run after specified delay
   //The default fixed-delay mode, the cycle time is calculated based on the last execution end time
   public void schedule(TimerTask task, long delay) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        sched(task, System.currentTimeMillis() + delay, 0);
    }
    
    //Run at specified time point
    public void schedule(TimerTask task, Date time) {
        sched(task, time.getTime(), 0);
    }
  • API for periodic tasks:

 // Run after a specified delay, and then run at a specified period
    public void schedule(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis() + delay, -period);
    }

    // Run at a specified time point, and then run at a specified period
    public void schedule(TimerTask task, Date firstTime, long period) {
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, firstTime.getTime(), -period);
    }

    //Run after the specified delay, and then run at the specified period
    //Default fixedRate mode, cycle time is calculated based on task execution start time
    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis() + delay, period);
    }

    public void scheduleAtFixedRate(TimerTask task, Date firstTime,
                                    long period) {
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, firstTime.getTime(), period);
    }

You can see that the sched method is called internally in the API method, and the time parameter is the next task execution time point, which is calculated. If the period parameter is 0, it represents a one-time task.

2. Implementation Principle

So let’s take a look at how scheduling is implemented internally in Timer.

2.1. Internal structure

Let’s first look at the components of Timer:

public class Timer {

    //task queue
    private final TaskQueue queue = new TaskQueue();

    //Worker thread, loop to get tasks
    private final TimerThread thread = new TimerThread(queue);

    private final Object threadReaper = new Object() {
        protected void finalize() throws Throwable {
            synchronized(queue) {
                thread.newTasksMayBeScheduled = false;
                queue.notify(); // In case queue is empty.
            }
        }
    };

    // Timer's serial number, named worker thread (static variable, can be used to distinguish the corresponding worker thread when starting multiple Timers)
    private final static AtomicInteger nextSerialNumber = new AtomicInteger(0);
    
}

Timer has three important modules, namely TimerTask, TaskQueue, TimerThread

  • TimerTask, the task to be executed

  • TaskQueue, task queue, TimerTask will be automatically sorted according to execution time after joining.

  • TimerThread, worker thread, the thread that actually executes TimerTask in a loop

So, how does the entire Timer run after adding the task? You can see the diagram below:

2.2, Worker thread

  • Create a task and call the scheule method
public void schedule(TimerTask task, Date firstTime, long period) {
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, firstTime.getTime(), -period);
}
  • Call the sched method internally

//The input parameters of the sched method are the task task, execution time, and execution cycle
private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");

    //Prevent overflow
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;

    // Lock the queue to avoid concurrent enqueueing
    synchronized(queue) {
        if (!thread.newTasksMayBeScheduled)
            throw new IllegalStateException("Timer already canceled.");
        
        // Lock the task to avoid concurrent modifications
        synchronized(task.lock) {
            if (task.state != TimerTask.VIRGIN)
                throw new IllegalStateException(
                    "Task already scheduled or canceled");
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }
        //Enqueue task
        queue.add(task);
        /* If the task is the current first task in the queue, wake up the worker thread
           This is because after the worker thread finishes processing the previous task, it will sleep until the execution time of the next task.
           If a task with an earlier nextExecutionTime jumps to the front, you need to wake up the worker thread immediately to check.
           Avoid delayed task execution
        */
        if (queue.getMin() == task)
            queue.notify();
    }
}

Some locks are added to the process to avoid the concurrency problem of adding TimerTask at the same time. You can see that the logic of the sched method is relatively simple. After the task is assigned a value and is added to the queue, the queue will automatically be sorted according to nextExecutionTime (in ascending order, the implementation principle of sorting will be mentioned later).

  • MainLoop of worker thread
public void run() {
    try {
        mainLoop();
    } finally {
        synchronized(queue) {
            newTasksMayBeScheduled = false;
            queue.clear();
        }
    }
}

/**
 * Main logic of working thread, cyclic execution
 */
private void mainLoop() {
    while (true) {
        try {
            TimerTask task;
            boolean taskFired; // Mark whether the task should be executed
            synchronized(queue) {
                // If the queue is empty and newTasksMayBeScheduled is true, wait for tasks to join.
                while (queue.isEmpty() & amp; & amp; newTasksMayBeScheduled)
                    queue.wait();
                
                // If the queue is empty and newTasksMayBeScheduled is false, it means that the thread should exit at this time
                if (queue.isEmpty())
                    break;

                // The queue is not empty, try to get the task from the queue (the task with the earliest target execution time)
                long currentTime, executionTime;
                task = queue.getMin();
                synchronized(task.lock) {
                    //Verify task status
                    if (task.state == TimerTask.CANCELLED) {
                        queue.removeMin();
                        continue;
                    }
                    currentTime = System.currentTimeMillis();
                    executionTime = task.nextExecutionTime;
                    
                    // Current time >= target execution time, indicating that the task is executable, set taskFired = true
                    if (taskFired = (executionTime<=currentTime)) {
                        if (task.period == 0) { // period == 0 indicates that it is a non-periodic task and should be removed from the queue first.
                            queue.removeMin();
                            task.state = TimerTask.EXECUTED;
                        } else { // Periodic tasks will reset their execution time based on period and then add them to the queue.
                            queue.rescheduleMin(
                              task.period<0 ? currentTime - task.period
                                            : executionTime + task.period);
                        }
                    }
                }
                if (!taskFired) // The task does not need to be executed, then wait
                    queue.wait(executionTime - currentTime);
            }
            if (taskFired) // If the task needs to be executed, call the run method of the task. What is executed here is actually the logic of the run method when the caller creates the task.
                task.run();
        } catch(InterruptedException e) {
        }
    }
}

As can be seen from the source code of mainLoop, the basic process is as follows

When it is found that it is a periodic task, the next task execution time will be calculated. There are two calculation methods at this time, namely in the previous API

  • schedule: period is a negative value, the next execution time

  • scheduleAtFixedRate: period is a positive value

queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period);

2.3, Priority Queue

When tasks are removed from the queue or the task execution time is modified, the queue is automatically sorted. Always keep the task with the earliest execution time at the head of the queue. So how is this achieved?

It will be clear if you look at the source code of TaskQueue

class TaskQueue {

    private TimerTask[] queue = new TimerTask[128];

    private int size = 0;

    int size() {
        return size;
    }
    
    void add(TimerTask task) {
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[ + + size] = task;
        fixUp(size);
    }

    TimerTask getMin() {
        return queue[1];
    }

    TimerTask get(int i) {
        return queue[i];
    }

    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null; // Drop extra reference to prevent memory leak
        fixDown(1);
    }

    void quickRemove(int i) {
        assert i <= size;

        queue[i] = queue[size];
        queue[size--] = null; // Drop extra ref to prevent memory leak
    }

    void rescheduleMin(long newTime) {
        queue[1].nextExecutionTime = newTime;
        fixDown(1);
    }

    boolean isEmpty() {
        return size==0;
    }

    void clear() {
        for (int i=1; i<=size; i + + )
            queue[i] = null;

        size = 0;
    }

    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size & amp; & amp; j > 0) {
            if (j < size & amp; & amp;
                queue[j].nextExecutionTime > queue[j + 1].nextExecutionTime)
                j + + ; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    void heapify() {
        for (int i = size/2; i >= 1; i--)
            fixDown(i);
    }
}

It can be seen that TaskQueue actually implements a minimum heap (balanced binary heap) based on arrays. The elements in the heap are sorted according to execution time nextExecutionTime. The task with the earliest execution time will always be ranked at the top of the heap. In this way, the task that the worker thread checks each time is the earliest task that needs to be executed. The initial size of the heap is 128, and there is a simple multiplication and expansion mechanism.

2.4, other methods

TimerTask The task has four states:

  • VIRGIN: The task has just been created and there is no schedule yet.

  • SCHEDULED: The task has been scheduled and entered into the queue.

  • EXECUTED: The task has been executed/is being executed

  • CANCELLED: The task has been canceled

Timer also provides cancel and purge methods

  • cancel, clear all tasks in the queue, and the worker thread exits.

  • purge, clears all tasks in the queue whose status is set to canceled.

2.5, Common application implementation

Java’s Timer is widely used to implement asynchronous task systems and is also common in some open source projects, such as the asynchronous logic in the message queue RocketMQ’s delayed message/consumption retry.

public void start() {
    if (started.compareAndSet(false, true)) {
        super.load();
        // Created a new timer
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // ...
        }
        
        //Called Timer's scheduleAtFixedRate method
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) {
                        ScheduleMessageService.this.persist();
                    }
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

The above code is the core logic of RocketMQ’s Delayed Message Delivery Task ScheduleMessageService, which is an asynchronous scheduled task implemented using Timer.

3. Summary

Whether it is implementing simple asynchronous logic or building a complex task system, Java’s Timer is indeed a convenient, practical, and stable tool class. From the implementation principle of Timer, we can also get a glimpse of a basic implementation of the timing system: thread loop + priority queue. This will also provide some inspiration for us to design related systems ourselves.

Personal summary of design highlights:

  1. Some locks are added to the process to avoid concurrency problems caused by TimeTask joining TimerTaskQueue and modifying TimeTsk at the same time.

  2. Use newTasksMayBeScheduled to control the execution of TimerTaskQueue by the worker thread

  3. Use small root heap algorithm to prioritize TimerTaskQueue

  4. The worker thread executes the tasks of TimerTaskQueue through loops, and also determines and processes the state machine and flow of the Task.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java Skill TreeHomepageOverview 139,408 people are learning the system