AQS Condition queue source code analysis

Usage of Condition

The previous article analyzed the process of ReentrantLock grabbing locks, threads joining the queue, and releasing locks. This article then looks at the application of conditional queues. Condition in AbstractQueuedSynchronizer represents a condition queue. We can implement thread communication through Condition. We want to suspend threads and wake them up when certain conditions are met. For example, the very common production-consumer model can be implemented through condition queues. First, let’s take a look at the simple usage of conditional queues based on ReentrantLock.

class ConditionTest {<!-- -->
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    void awaitTest() {<!-- -->
        try {<!-- -->
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "The thread hangs, waiting to be awakened---");
            condition.await();
            System.out.println(Thread.currentThread().getName() + "The thread is awakened and continues execution---");
        } catch (Exception e) {<!-- -->
            throw new RuntimeException(e);
        } finally {<!-- -->
            lock.unlock();
        }
    }

    void signalTest() {<!-- -->
        try {<!-- -->
            lock.lock();
            condition.signal();
            System.out.println(Thread.currentThread().getName() + "Wake up the thread in the condition queue");
        } catch (Exception e) {<!-- -->
            throw new RuntimeException(e);
        } finally {<!-- -->
            lock.unlock();
        }
    }

    public static void main(String[] args){<!-- -->
        JucApplicationTests tests = new JucApplicationTests();
        new Thread(tests::awaitTest).start();
        new Thread(tests::awaitTest).start();
        new Thread(tests::signalTest).start();
        new Thread(tests::signalTest).start();
    }
}

When using condition, the corresponding lock must be obtained. That is to say, await and signal operations depend on ReentrantLock. Whether it is a suspend or wake-up operation, the lock must be held first. Semantically speaking, this is very similar to wait, notify, and notifyAll in Object. You must obtain the monitor lock to operate.
Let’s take a look at the ConditionObject class

Condition condition = lock.newCondition();

final ConditionObject newCondition() {<!-- -->
    return new ConditionObject();
}

public class ConditionObject implements Condition, java.io.Serializable {<!-- -->
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() {<!-- --> }

    public final void signal() {<!-- -->
        ...
    }
    public final void signalAll() {<!-- -->
        ...
    }
    public final void await() throws InterruptedException {<!-- -->
        ...
    }
    ...
}
  1. FirstWaiter and lastWaiter in ConditionObject represent the first and last nodes of the condition queue respectively. Combined with the nextWaiter attribute in Node, a one-way linked list is formed, which is what we call the condition queue.
  2. Multiple ConditionObjects can be new, that is, multiple condition queues can exist at the same time.
  3. The nodes of the condition queue and the blocking queue are both Node instances. When a thread calls condition.signal, the head of the condition queue will be awakened and transferred to the tail of the blocking queue.
  4. After calling the condition.await method, the thread will wrap itself into a Node node and join the tail of the condition queue, while suspending the thread.
  5. The difference between conditional queue and blocking queue: We mentioned the concept of blocking queue when analyzing ReentrantLock. After the thread fails to grab the lock, it will enter the blocking queue and wait until it is awakened by the front node. The condition queue is the premise that the thread holds the lock. When the thread executes await, it enters the condition queue and hangs; the thread in the condition queue will hang until a thread calls signal.

Conditional team entry

First look at the await method

public final void await() throws InterruptedException {<!-- -->
    if (Thread.interrupted())
        throw new InterruptedException();
    //Node joins condition queue
    Node node = addConditionWaiter();
    //Release the lock before suspending
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //If the node is not in the blocking queue, suspend the thread. As we said before, the node will be transferred from the condition queue to the blocking queue.
    while (!isOnSyncQueue(node)) {<!-- -->
        //The thread hangs here, waiting for the node to be awakened
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) & amp; & amp; interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if canceled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}


private Node addConditionWaiter() {<!-- -->
    Node t = lastWaiter;
    //The lastWaiter status is not Node.CONDITION, indicating that the queue has been cancelled.
    if (t != null & amp; & amp; t.waitStatus != Node.CONDITION) {<!-- -->
        // If the tail node of the conditional queue is dequeued, remove it from the queue and traverse backward until the normally waiting node is obtained.
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //The node joins the queue and becomes the new tail. The initialization state is Node.CONDITION.
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/**
  * Traverse the condition queue and kick out the unqueued nodes t.waitStatus != Node.CONDITION
  */
private void unlinkCancelledWaiters() {<!-- -->
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {<!-- -->
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {<!-- -->
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

Thread wake-up

while (!isOnSyncQueue(node)) {
//The thread hangs here, waiting for the node to be awakened
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

From the above code, we know that when the thread executes LockSupport.park(this), it hangs and waits to be awakened. In order to facilitate understanding, let’s first look at the method of waking up the thread.

/**
 * The wake-up operation is actually to transfer the nodes in the condition queue to the blocking queue
 */
public final void signal() {<!-- -->
    //The thread executing signal must hold the current exclusive lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * The conditional queue finds the first node that needs to be transferred from front to back. Here are some one-way linked list operations.
 */
private void doSignal(Node first) {<!-- -->
    do {<!-- -->
        //The first node is about to leave the queue. If there is no nextWaiter, set lastWaiter to empty.
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //dequeue
        first.nextWaiter = null;
    //Loop here, if the first transfer is unsuccessful, then transfer to the next one, and so on.
    } while (!transferForSignal(first) & amp; & amp;
             (first = firstWaiter) != null);
}

/**
 * In this method, the node is transferred to the blocking queue
 */
final boolean transferForSignal(Node node) {<!-- -->
    /*
     * Set waitStatus to 0
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Move the node to the end of the blocking queue. The returned p is the predecessor node in the blocking queue.
     */
    Node p = enq(node);
    int ws = p.waitStatus;

    /**
     * ws > 0 indicates that the predecessor node has canceled the queue and wakes up the thread corresponding to the current node.
     * ws <= 0, set the predecessor node status to Node.SIGNAL, and wake up the current thread if CAS fails.
     */
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

Pay attention to the condition ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL). Under normal circumstances, the predecessor node is in the queue ws=0, and the CAS success status becomes Node.SIGNAL; then what situation will wake up the thread? ? If the predecessor node cancels the queue or CAS fails to modify the status of the predecessor node, the thread will be awakened.

Check interrupt status

while (!isOnSyncQueue(node)) {
//The thread hangs here, waiting for the node to be awakened
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

The awakened thread continues execution from LockSupport.park in the await method. The checkInterruptWhileWaiting method will be executed after the thread is awakened. Because the thread may be awakened by a signal or an interruption occurs, the while loop exits the loop. There are two situations:
1. When a thread is interrupted, it is necessary to distinguish whether it was interrupted during await or after signal, and transfer the node from the condition queue to the blocking queue.
2. No interruption occurs, wait for the node to be transferred to the blocking queue after the signal, and then exit the loop

/**
 * Check the interrupt status, if
 * 1. If the thread is not interrupted, return 0
 * 2. The thread is interrupted and it occurs during await, and the exception THROW_IE needs to be thrown.
 * 3. If the thread is interrupted after the signal, the interrupt status needs to be reset REINTERRUPT
 */
private int checkInterruptWhileWaiting(Node node) {<!-- -->
    return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}

/**
* This method will only be called in the interrupt state to determine whether the node is transferred during await or after signal.
*/
final boolean transferAfterCancelledWait(Node node) {<!-- -->
    //The signal method will set waitStatus to 0
    //The success of this CAS indicates that waitStatus is still Node.CONDITION, indicating that the interruption occurred during await.
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {<!-- -->
        //Even if it is interrupted, it will still be transferred to the blocking queue.
        enq(node);
        return true;
    }
    /*
     * An interruption occurs after the signal, and the spin waiting node is transferred to the blocking queue
     */
    while (!isOnSyncQueue(node))
    Thread.yield();
    return false;
}

Therefore, even if the thread in the conditional queue is interrupted, whether during await or after signal, the node will be transferred to the blocking queue.
There are two situations when the conditional queue node transfers to the blocking queue: first, the node in the conditional queue is transferred to the blocking queue normally through signal; it may also be that an interruption occurs, and the node wakes up and finds that it is not signaled, and will actively enter blocking. in the queue, but the interrupt status interruptMode will be recorded. PS: The following code will respond to the interrupt status.

Get exclusive lock

Let’s continue with the code of the await method. After the previous analysis of the while loop, the node has entered the blocking queue and is ready to acquire the lock.

public final void await() throws InterruptedException {<!-- -->
    ...
    while (!isOnSyncQueue(node)) {<!-- -->
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
//acquireQueued Spin to acquire the lock
//The thread will block to acquire the lock. When this method returns, it means that the thread has acquired the lock.
//This method will return whether the thread is interrupted. If the thread is interrupted after signal, set REINTERRUPT
    if (acquireQueued(node, savedState) & amp; & amp; interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
//There will be node.nextWaiter != null processing in signal
//But if an interruption occurs during await, the condition queue is not processed by signal code, and the node is not dequeued, then dequeue processing is performed here.
    if (node.nextWaiter != null) // clean up if canceled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        //Handle interrupt status according to interruptMode
        reportInterruptAfterWait(interruptMode);
}

Handling interrupt status

Handle two situations of interruption:
THROW_IE: The thread is interrupted during await and an exception needs to be thrown. If you do not respond to the interrupt during await, you can use the awaitUninterruptibly method.
REINTERRUPT: It means that no interruption occurred during await, but the thread was interrupted after signal, and the interruption status was reset.

private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {<!-- -->
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

Do not handle interrupts

It can be seen that this method is different from the await method in that it does not respond to interrupts. Even if an interrupt occurs, the thread in the condition queue is still waiting to be transferred to the blocking queue. After acquiring the lock, it just resets the interrupt status.

public final void awaitUninterruptibly() {<!-- -->
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {<!-- -->
        LockSupport.park(this);
        //There is no break here, save interrupted
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        //Reset the interrupt flag
        selfInterrupt();
}

Response to interruption

We continue to think along the lines of interruption. If the thread is interrupted during await, the interruption flag will be reset. After that, the node will still be transferred to the blocking queue to wait for the exclusive lock. Then what will happen to the interruption status after the thread obtains the lock? deal with? The answer lies in the acquireQueued method: the acquireQueued method returns a boolean value, and returning true indicates whether the thread has been interrupted.

public final void acquire(int arg) {<!-- -->
if (!tryAcquire(arg) & amp; & amp;
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    //The ability to execute to this point indicates that the thread was awakened by an interrupt.
    selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {<!-- -->
    boolean failed = true;
    try {<!-- -->
        boolean interrupted = false;
        for (;;) {<!-- -->
            final Node p = node.predecessor();
            if (p == head & amp; & amp; tryAcquire(arg)) {<!-- -->
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) & amp; & amp;
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {<!-- -->
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {<!-- -->
    LockSupport.park(this);
    return Thread.interrupted();
}

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

After the thread wakes up from LockSupport.park(this);, it just determines the interrupt status. If it is interrupted and sets interrupted=true, it will enter the next for loop until it gets the lock and returns interrupted, so as long as the thread waits for the lock An interruption has occurred. interrupted is equal to true.
The acquire method determines that the return value is true, executes the selfInterrupt method, and sets the interrupt status. How to respond to the interrupt is left to the caller. In other words, the acquire method itself does not respond to interrupts. For example, thread A is waiting to acquire the lock, and thread B interrupts it. Thread A will not immediately respond to the interrupt and continue to wait for the lock, but the interrupt status will be recorded for easy calling. be processed.
AQS also provides the method acquireInterruptibly to respond to interruptions. It is still the same scenario as above. The difference is that after thread A is interrupted, the lock grab will be canceled and an InterruptedException exception will be thrown immediately.

Timeout mechanism

ConditionObject also provides an await method with a timeout mechanism. The logic is similar. Let’s choose one to analyze.

public final boolean await(long time, TimeUnit unit)
throws InterruptedException {<!-- -->
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    //Calculate timeout time
    final long deadline = System.nanoTime() + nanosTimeout;
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {<!-- -->
        //Indicates that time has arrived
        if (nanosTimeout <= 0L) {<!-- -->
            //Transfer the node to the blocking queue. If the transfer is successful, return true to indicate that it was triggered by timeout.
            //If false is returned, it means that signal has been called and the node has been transferred, then there is no timeout.
            timedout = transferAfterCancelledWait(node);
            break;
        }
        //spinForTimeoutThreshold=1000L If it is less than one millisecond, it will no longer parkNanos.
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //Difference between timeout time and current time
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) & amp; & amp; interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

Await with timeout uses parkNanos to sleep for a specified time. After the thread is awakened, it checks whether the signal has been called and whether it has been called. Otherwise, it has timed out. After the timeout, the node is transferred to the blocking queue.

Summary:

  1. Condition The condition queue is a one-way linked list structure. The thread calling the await method will be blocked for execution, packaged into nodes and added to the end of the linked list.
  2. When a thread calls signal, it is actually the process of transferring the conditional queue node to the blocking queue. Regardless of whether an interruption occurs, the node will be transferred to the blocking queue, and then blocked waiting to acquire the lock.
  3. The await method responds to interruptions by default. If the thread in the conditional queue is interrupted during await, the interruption status will be set and an exception will be thrown after acquiring the lock. The lock method does not respond to interrupts. If you want to respond to interrupts, you can use lockInterruptibly.