Java priority task queue practice

Basic understanding of queues

Before talking about queues, let’s talk about two nouns: Task is a task, and TaskExecutor is a task executor.

The queue we are going to talk about today is completely consistent with the situation of an organization. When a task comes in, the TaskExecutor immediately starts executing the task. When there is no task, the TaskExecutor is in a blocked state. When there are many tasks, the task also needs to be executed. In queuing, there can be multiple TaskExecutors, and certain Tasks can be designated to be executed first or lagging.

To sum up, we come to the following relationship: the queue is equivalent to an organization, the TaskExecutor is equivalent to the window, and the worker is the Task.

Normal queue

Of course, many organizations do not set any priority windows for military personnel, so there are also queues without priority, so we first implement a non-priority queue.

Unlike the above-mentioned organization, an organization can have an organization first, then a window, and then a worker. But when we write code, if we want to write a queue, we must write TaskExecutor in the queue. Then we must first write the TaskExecutor class, and so on, we must first have the Task class.

Therefore, we first write an interface for Task, which is the person who does things. I designed it as an interface to facilitate people who do various things to come in:

// People who do things.
public interface ITask {<!-- -->
    // When doing things, we give the methods of doing things to the people who do things, that is, it is up to you to decide what you want to do.
    void run();
}

Next, write a TaskExecutor class, which is a window, used to execute Tasks. Read the comments carefully, which is very helpful for understanding:

// window
public class TaskExecutor extends Thread {<!-- -->
 
    // The team photographed at the window, the people in this team are doing business.
    private BlockingQueue<ITask> taskQueue;
 
    //Whether this service window is waiting for service.
    private boolean isRunning = true;
 
    public TaskExecutor(BlockingQueue<ITask> taskQueue) {<!-- -->
        this.taskQueue = taskQueue;
    }
 
    // get off work.
    public void quit() {<!-- -->
        isRunning = false;
        interrupt();
    }
 
    @Override
    public void run() {<!-- -->
        while (isRunning) {<!-- --> // If it is working, just stay.
            ITask iTask;
            try {<!-- -->
                iTask = taskQueue.take(); // Call the next person to come in, and wait if there is no one.
            } catch (InterruptedException e) {<!-- -->
                if (!isRunning) {<!-- -->
                    // An accident occurred. If you are off duty, close the window.
                    interrupt();
                    break; // If break is executed, the following code will be invalid.
                }
                // An accident occurred and it is not off duty, so the window continues to wait.
                continue;
            }
 
            // Do things for this person.
            iTask.run();
        }
    }
}

Here is a brief explanation of the BlockingQueue#take() method. When the item in the queue is empty, this method will always be in a blocking state. When an item enters the queue, it will immediately have a return value, which is the same as ServerSocket. The .accept() method is the same, so we put it in a Thread to avoid blocking the thread that calls it (which may be the main thread in Android).

Now that we have the workers and windows, we encapsulate a queue, which is an organization, to manage these windows:

//An organization.
public class TaskQueue {<!-- -->
 
    // There is a queue in an organization, and there are people in the queue who are doing business.
    private BlockingQueue<ITask> mTaskQueue;
    // Lots of windows.
    private TaskExecutor[] mTaskExecutors;
 
    // When the developer creates a new queue, the number of windows must be specified.
    public TaskQueue(int size) {<!-- -->
        mTaskQueue = new LinkedBlockingQueue<>();
        mTaskExecutors = new TaskExecutor[size];
    }
 
    // Start to work.
    public void start() {<!-- -->
        stop();
        //Open all windows and let the windows start working.
        for (int i = 0; i < mTaskExecutors.length; i + + ) {<!-- -->
            mTaskExecutors[i] = new TaskExecutor(mTaskQueue);
            mTaskExecutors[i].start();
        }
    }
 
    // Unify all windows to get off work.
    public void stop() {<!-- -->
        if (mTaskExecutors != null)
            for (TaskExecutor taskExecutor : mTaskExecutors) {<!-- -->
                if (taskExecutor != null) taskExecutor.quit();
            }
    }
 
    // Open a door so that people doing business can come in.
    public <T extends ITask> int add(T task) {<!-- -->
        if (!mTaskQueue.contains(task)) {<!-- -->
            mTaskQueue.add(task);
        }
        // Return the number of people in the queue, making it open and transparent, so that people outside can see how many people are waiting for services.
        return mTaskQueue.size();
    }
}

We have a certain organization, window, and people working on the job. Next, we will send a person to do a specific thing. But my Task above is an interface, so we need to use a class to implement this interface to do a certain thing. thing:

//Do something to print your own ID.
public class PrintTask implements ITask {<!-- -->
 
    private int id;
 
    public PrintTask(int id) {<!-- -->
        this.id = id;
    }
 
    @Override
    public void run() {<!-- -->
        // In order to simulate the speed of window services as much as possible, we pause here for two seconds.
        try {<!-- -->
            Thread.sleep(2000);
        } catch (InterruptedException ignored) {<!-- -->
        }
 
        System.out.println("My id is:" + id);
    }
}

Let’s run our simulated virtual world once:

public class Main {<!-- -->
 
    public static void main(String... args) {<!-- -->
        // Only one window is open here for now.
        TaskQueue taskQueue = new TaskQueue(1);
        taskQueue.start();
 
        for (int i = 0; i < 10; i + + ) {<!-- -->
            PrintTask task = new PrintTask(i);
            taskQueue.add(task);
        }
    }
 
}

That’s right, the queue is printed out according to our ideal situation:

My id is: 0
My id is: 1
My id is: 2
My id is: 3
My id is: 4
My id is: 5
My id is: 6
My id is: 7
My id is: 8
My id is: 9

I only have one window open on my door above, and I’ll open a few more windows below:

public class Main {<!-- -->
 
    public static void main(String... args) {<!-- -->
        //Open three windows.
        TaskQueue taskQueue = new TaskQueue(3);
        taskQueue.start(); // An organization starts working.
 
        for (int i = 0; i < 10; i + + ) {<!-- -->
            // new 10 people who need to do things and enter an organization to do things.
            PrintTask task = new PrintTask(i);
            taskQueue.add(task);
        }
    }
}

It should be noted here that during initialization we opened three windows, and the internal order should be as follows:

When the door of an institution is opened, the first person who works goes in and goes to the first window, the second person who works goes in and goes to the second window, the third person goes in and goes to the third window, and the fourth person goes in and goes to the third window. When the first, second, or third window is finished, the fourth person will go to that window to continue working, the fifth person will wait, and so on. . In this way, the effect of concurrent queue colleagues is achieved.

This is an ordinary queue, and some other features are also encapsulated based on this. Then I will add the priority of the characters based on this, which is the special window we mentioned above -> Military priority!

Priority Queue

When we are waiting in line for service, a service worker comes. So how to judge whether this service worker can give priority to the service? Then it is necessary to determine whether it has priority authority and even to what extent it can give priority.

So we need to give this Task a flag, which is the priority, so I use an enumeration class to mark the priority:

public enum Priority {<!-- -->
    LOW, // lowest.
    DEFAULT, //Default level.
    HIGH, // Higher than default level.
    Immediately // Execute immediately.
}

Here I have divided it into four levels: lowest, default, high, and immediate. This level must be given to our service personnel, that is, Task

public interface ITask {<!-- -->
 
    void run();
 
    void setPriority(Priority priority);
 
    Priority getPriority();
}

Priority can be set and priority can be obtained.

Next we have to replace the above LinkedBlockingQueue with PriorityBlockingQueue, because it can automatically compare priorities. It requires generics, that is, our Task must implement the Comparable interface, and Comparable has a compareTo(E) method that can compare two To do comparison, our queue needs to change the implementation method:

//An organization.
public class TaskQueue {<!-- -->
 
    // There is a queue in an organization, and there are people in the queue who are doing business.
    private BlockingQueue<ITask> mTaskQueue;
    // Lots of windows.
    private TaskExecutor[] mTaskExecutors;
 
    // When the developer creates a new queue, the number of windows must be specified.
    public TaskQueue(int size) {<!-- -->
        mTaskQueue = new PriorityBlockingQueue<>();
        mTaskExecutors = new TaskExecutor[size];
    }
...

Then the ITask interface inherits the Comparable interface:

public interface ITask extends Comparable<ITask> {<!-- -->
 
    void run();
 
    void setPriority(Priority priority);
 
    Priority getPriority();
}

Because there are setPriority(Priority) method, getPriority() method and Comparable’s compareTo(E) method, each of our Tasks needs to implement these methods, which will be very troublesome, so we encapsulate a BasicTask:

public abstract class BasicTask implements ITask {<!-- -->
 
    //Default priority.
    private Priority priority = Priority.DEFAULT;
 
    @Override
    public void setPriority(Priority priority) {<!-- -->
        this.priority = priority;
    }
 
    @Override
    public Priority getPriority() {<!-- -->
        return priority;
    }
 
    // Do priority comparison.
    @Override
    public int compareTo(ITask another) {<!-- -->
        final Priority me = this.getPriority();
        final Priority it = another.getPriority();
        return me == it ? [...] : it.ordinal() - me.ordinal();
    }
}

Everything else is easy to explain. We don’t quite understand it when we see the compareTo(E) method. Let’s talk about this method:

The E passed in compareTo(E) is another Task. If the current Task is earlier than another Task, a negative number will be returned. If it is later than another Task, a positive number will be returned. If the priorities are equal, then a negative number will be returned. 0.

Special attention should be paid here. We saw that the Priority.orinal() method was called when the priorities of the two Tasks were different, and the current orinal was subtracted from the subsequent orinal. How to understand it? First, we must understand the Priority.orinal() method. Every enumeration value in Java has this method. The value of this enumeration is its subscript + 1, which is [index + 1], so the Priority class we wrote In fact, it can be understood like this:

public enum Priority {<!-- -->
    1,
    2,
    3,
    4
}

Continuing, if the priority level set for the current Task is relatively low and the priority level set for the Task in compareTo(E) is relatively high, then the Priority is different, then the returned value is an integer, so the current Task will be ranked behind by the PriorityBlockingQueue. If it is swapped, then The returned results are also changed.

But we noticed that me == it ? […] : it.ordinal() – me.ordinal(); What is the […] in it? Because there is a piece of code missing here, hahaha (why is this author so stupid), this piece of code means what to do when the priorities are the same, that is, whoever joins the queue first will be in the front, so how to return the value, we How do you know which Task will be added to the queue first? At this time, the cute me appeared. I gave it a sequence to mark when it joined the queue. It was finished soon, so we can modify the ITask interface and add two methods:

public interface ITask extends Comparable<ITask> {<!-- -->
 
    void run();
 
    void setPriority(Priority priority);
 
    Priority getPriority();
 
    void setSequence(int sequence);
 
    int getSequence();
}

We use setSequence(int) to mark the order in which it is added to the queue, and because setSequence(int) and getSequence() are required by all Tasks, we implement these two methods in BasicTask:

public abstract class BasicTask implements ITask {<!-- -->
 
    //Default priority.
    private Priority priority = Priority.DEFAULT;
    private int sequence;
 
    @Override
    public void setPriority(Priority priority) {<!-- -->
        this.priority = priority;
    }
 
    @Override
    public Priority getPriority() {<!-- -->
        return priority;
    }
 
    @Override
    public void setSequence(int sequence) {<!-- -->
        this.sequence = sequence;
    }
 
    @Override
    public int getSequence() {<!-- -->
        return sequence;
    }
 
    // Do priority comparison.
    @Override
    public int compareTo(ITask another) {<!-- -->
        final Priority me = this.getPriority();
        final Priority it = another.getPriority();
        return me == it ? this.getSequence() - another.getSequence() :
            it.ordinal() - me.ordinal();
    }
}

See, the […] just now has become this.getSequence() – another.getSequence(). This needs to be logically corresponding to the above it.ordinal() – me.ordinal();. As mentioned above, if The priority level set for the current Task is relatively low, and the priority level set for the Task in compareTo(E) is relatively high. If the Priority is different, then the returned value is an integer, so the current Task will be ranked behind the PriorityBlockingQueue. If it is swapped, the return result will also be Just exchanged it.

The logic here corresponds to the opposite of the above logic, because here is the return when the two priorities are the same, and the above is the return when the two priorities are different, so when the priorities are the same, a negative number is returned to represent the current Task At the front, returning a positive number indicates that the current Task is at the back, which is exactly the logical correspondence above.

The next step is to set the sequence for Task, so we make a trick on the T void add(T) method in TaskQueue:

public class TaskQueue {<!-- -->
 
    private AtomicInteger mAtomicInteger = new AtomicInteger();
 
    ...
 
    public TaskQueue(int size) {<!-- -->
        ...
    }
 
    public void start() {<!-- -->
        ...
    }
 
    public void stop() {<!-- -->
        ...
    }
 
    public <T extends ITask> int add(T task) {<!-- -->
        if (!mTaskQueue.contains(task)) {<!-- -->
            task.setSequence(mAtomicInteger.incrementAndGet()); // Pay attention to this line.
            mTaskQueue.add(task);
        }
        return mTaskQueue.size();
    }
}

Here we use the AtomicInteger class, and its incrementAndGet() method will increment by 1 each time. In fact, it is equivalent to:

mAtomicInteger.addAndGet(1);

Please search for other specific usages by yourself, and I won’t go into details here.

At this point, our priority queue has been implemented. Let’s do the test:

public static void main(String... args) {<!-- -->
    // Open a window, which will make the priority more obvious.
    TaskQueue taskQueue = new TaskQueue(1);
    taskQueue.start(); // // An organization starts working.
 
    // In order to show the priority effect, we pre-add 3 blocks in front to make the priority effect at the back more obvious.
    taskQueue.add(new PrintTask(110));
    taskQueue.add(new PrintTask(112));
    taskQueue.add(new PrintTask(122));
 
    for (int i = 0; i < 10; i + + ) {<!-- --> // Starting from the 0th person.
    PrintTask task = new PrintTask(i);
    if (1 == i) {<!-- -->
        task.setPriority(Priority.LOW); // Let the second person who enters do the work last.
    } else if (8 == i) {<!-- -->
        task.setPriority(Priority.HIGH); // Let the 9th person who enters do the work second.
    } else if (9 == i) {<!-- -->
        task.setPriority(Priority.Immediately); // Let the 10th person who enters do the work first.
    }
    // ...Other people who enter will do things in the order they entered.
    taskQueue.add(task);
}

Yes, this is the effect we see:

My id is: 9
My id is: 8
My id is: 110
My id is: 112
My id is: 122
My id is: 0
My id is: 2
My id is: 3
My id is: 4
My id is: 5
My id is: 6
My id is: 7
My id is: 1