There are diagrams and cases, I finally explained the principle of Condition thoroughly

Hello everyone, I am Ah Q!

20 pictures illustrating the principle of ReentrantLock locking and unlocking Once the article was published, it triggered a heated discussion, and some friends came to the pop-up window: usually adding and unlocking is directly implemented by using the Synchronized keyword , easy to use, why use ReentrantLock?

In order to solve the doubts of the friends, let’s make a simple comparison between the two:

Similar

Both are “reentrant locks”, that is, after the current thread acquires the lock object, if you want to continue to acquire the lock object, you can still continue to acquire it, but the counter of the lock object can be “+ 1”.

Differences

  1. ReentrantLock is implemented based on API, and Synchronized is implemented depending on JVM;
  2. ReentrantLock can respond to interrupts, but Synchronized cannot;
  3. ReentrantLock can specify whether it is a fair lock or an unfair lock, while Synchronized can only be an unfair lock;
  4. The lock of ReentrantLock is synchronous and non-blocking, using an optimistic concurrency strategy, while Synchronized is synchronously blocking, using a pessimistic concurrency strategy;
  5. ReentrantLock can implement multi-channel selection notification with Condition, Synchronized through wait() and notify() The /notifyAll() method can implement the waiting/notification mechanism (one-way notification);

To sum up, ReentrantLock still has different usage scenarios from Synchronized. Today we will talk about its multi-channel selection notification function.

Actual combat

“On paper” without actual combat is all nonsense. Today we do the opposite and throw out the actual combat Demo first.

Scene description

In order to attract more car owners to come to refuel, the gas station has put an automatic car washing machine in the gas station to provide free car washing services for refueled cars. We stipulate that cars must be refueled according to the process of “refueling -> car washing -> driving away”, and the next car is allowed to come in for refueling after the previous car has left.

Code Implementation

First create a lock object and generate three Condition

/**
 * Flag to control thread wakeup
 */
private int flag = 1;

/**
 * Create lock object
 */
private Lock lock = new ReentrantLock();

/**
 * waiting queue
 * c1 corresponds to refueling
 * c2 corresponds to car washing
 * c3 corresponds to driving
 */
Condition c1 = lock. newCondition();
Condition c2 = lock. newCondition();
Condition c3 = lock. newCondition();

Then declare the method of refueling, washing and driving away, and stipulate that after refueling, go to the car wash and drive away from the gas station

/**
 * Car refueling
 */
public void fuelUp(int num) {<!-- -->
lock. lock();
try {<!-- -->
while (flag!=1){<!-- -->
c1. await();
}
System.out.println("No." + num + "The car starts to refuel");
flag = 2;
c2. signal();
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
} finally {<!-- -->
lock. unlock();
}

}

/**
 * car wash
 */
public void carWash(int num) {<!-- -->
lock. lock();
try {<!-- -->
while (flag!=2){<!-- -->
c2. await();
}
System.out.println("No." + num + "The car starts cleaning");
flag = 3;
c3.signal();
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
} finally {<!-- -->
lock. unlock();
}
}

/**
 * drive away
 */
public void drive(int num) {<!-- -->
lock. lock();
try {<!-- -->
while (flag!=3){<!-- -->
c3. await();
}
System.out.println("No." + num + "The car has left the gas station");
flag = 1;
c1. signal();
} catch (InterruptedException e) {<!-- -->
e.printStackTrace();
} finally {<!-- -->
lock. unlock();
}
}

Among them, await is the waiting method, and signal is the wake-up method.

Finally, let’s define the main method to simulate the scene where 3 cars arrive at the gas station at the same time

public static void main(String[] args) {<!-- -->
CarOperation carOperation = new CarOperation();
// car refueling
new Thread(()->{<!-- -->
for (int i = 1; i < 4; i ++ ) {<!-- -->
carOperation. fuelUp(i);
}
},"fuelUp").start();

//car wash
new Thread(()->{<!-- -->
for (int i = 1; i < 4; i ++ ) {<!-- -->
carOperation. carWash(i);
}
},"carRepair").start();

// drive away
new Thread(()->{<!-- -->
for (int i = 1; i < 4; i ++ ) {<!-- -->
carOperation. drive(i);
}
},"drive").start();
}

Is it smooth to use? In order to deepen everyone’s understanding of Condition, let’s analyze the principle of a wave of Condition graphically~

Illustration

Everyone has seen that the above cases are all operated around Condition, so what is Condition? Condition is an interface, which defines the methods of thread waiting and waking up.

The lock.newCondition() called in the code actually calls the newCondition method in the Sync class, and the ConditionObject It is the implementation class of Condition.

final ConditionObject newCondition() {<!-- -->
    return new ConditionObject();
}

We found that it is inside AQS and cannot be instantiated directly, so it needs to be used with ReentrantLock.

ConditionObject

ConditionObject internally maintains a FIFO one-way queue based on Node, which we call waiting queue. firstWaiter points to the first node, lastWaiter points to the tail node, nextWaiter in Node points to the next element in the queue, And the waitStatus of the nodes in the waiting queue are all -2.

After understanding the data structure of ConditionObject, let’s illustrate the waiting/wakeup mechanism of ReentrantLock from the perspective of source code.

await

First find the source code of await in the AQS class

public final void await() throws InterruptedException {<!-- -->
if (Thread. interrupted())
throw new InterruptedException();
//Encapsulate the current thread into a node and add it to the tail of the waiting queue
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
    //Detect whether the thread of this node is on the synchronization queue, if not, it means that the thread is not yet qualified to compete for the lock, then continue to wait until it is detected that this node is on the synchronization queue
while (!isOnSyncQueue(node)) {<!-- -->
        //When node is in the waiting queue, suspend the current thread.
LockSupport. park(this);
        //If an interrupt occurs, jump out of the loop and end the wait
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
    //After being awakened, the node must be on the AQS queue,
    //Analyzed before that the acquireQueued method will continue to block if the lock cannot be obtained
    //Get the lock, return true if interrupted, return false if not interrupted
    //A thread that acquires a lock that is interrupted and not awakened by an interrupt sets the interrupt mode to re-interrupt
if (acquireQueued(node, savedState) & amp; & amp; interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node. nextWaiter != null) // clean up if canceled
        //Clear all nodes whose status is not CONDITION in the condition queue
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

If the thread is interrupted, the interrupt flag is cleared and an exception is thrown.

View addConditionWaiter

The function of this method is to encapsulate the current thread into a node and add it to the tail of the waiting queue

private Node addConditionWaiter() {<!-- -->
Node t = lastWaiter;
if (t != null & amp; & amp; t.waitStatus != Node.CONDITION) {<!-- -->
//Remove nodes that are not in the waiting state from the waiting queue
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread. currentThread(), Node. CONDITION);
// tail node is empty
if (t == null)
        // point the first node to node
firstWaiter = node;
else
//Point the nextWaiter of the tail node to the node node
t.nextWaiter = node;
//The tail node points to node
lastWaiter = node;
return node;
}

First point t to the tail node, if the tail node is not empty and its waitStatus!=-2, then remove the node that is not in the waiting state from the waiting queue, and point t to the new end node of .

Encapsulate the current thread into a node whose waitStatus is -2 and append it to the end of the waiting queue.

If the tail node is empty, the queue is empty, and both the head and tail nodes point to the current node.

If the tail node is not empty, it proves that there are other nodes in the queue, then point the nextWaiter of the current tail node to the current node, and set the current node as the tail node.

Then let’s take a look at the unlinkCancelledWaiters() method – remove the nodes that are not in the waiting state from the waiting queue.

private void unlinkCancelledWaiters() {<!-- -->
Node t = firstWaiter;
//trail is the predecessor node of t
Node trail = null;
while (t != null) {<!-- -->
//next is the successor node of t
Node next = t.nextWaiter;
//If the waitStatus of the t node is not -2, the node is invalid
if (t.waitStatus != Node.CONDITION) {<!-- -->
t.nextWaiter = null;
//If the predecessor node of t is empty, point the first node to next
if (trail == null)
firstWaiter = next;
else
//The predecessor node of t is not empty, and the successor pointer of the predecessor node points to next
trail.nextWaiter = next;
//If next is null, point the tail node to the predecessor node of t
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

t is the current node, trail is the predecessor node of t, and next is the successor node of t.

The while method will follow the waiting queue from the first node to find the node with waitStatus!=-2, and set the nextWaiter of the current node to empty .

If the predecessor node of the current node is empty, it means that the current node is the first node, then set next as the first node;

If it is not empty, point the nextWaiter of the predecessor node to the successor node.

If the successor node is empty, directly set the predecessor node as the tail node.

View fullyRelease

It can be almost understood from the name that the function of this method is to completely release the lock resource.

final int fullyRelease(Node node) {<!-- -->
//The failure to release the lock is true, and the success of the lock release is false
boolean failed = true;
try {<!-- -->
// Get the state of the current lock
int savedState = getState();
//If the lock is released successfully
if (release(savedState)) {<!-- -->
failed = false;
return savedState;
} else {<!-- -->
throw new IllegalMonitorStateException();
}
} finally {<!-- -->
if (failed)
//If the release of the lock fails, the node status will be set to cancel
node.waitStatus = Node.CANCELLED;
}
}

The most important thing is the release method, and as we have said above, if release is executed successfully, the current thread has released the lock resource.

View isOnSyncQueue

Determine whether the Node where the current thread is located is in the synchronization queue (the synchronization queue is the AQS queue). Here it is necessary to show you the relationship diagram between the synchronization queue and the waiting queue.

final boolean isOnSyncQueue(Node node) {<!-- -->
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node. next != null)
return true;
//The next of the node node is null
return findNodeFromTail(node);
}

If waitStatus=-2 of the current node means it is in the waiting queue, return false; if the current node has a predecessor node, it proves that it is in AQS code> queue, but the predecessor node is empty, indicating that it is the head node, and the head node does not participate in lock competition, and returns false.

If the current node is neither in the waiting queue nor the head node in AQS and there is a next node, it means it exists in AQS, Return true directly.

Then look down, if the next of the current node is empty, the node may be a tail node, or the next of the node has not yet Assignment, so nodes need to be traversed from back to front.

private boolean findNodeFromTail(Node node) {<!-- -->
Node t = tail;
for (;;) {<!-- -->
//First use the tail node to judge, and then use the nodes in the queue to judge in turn
if (t == node)
return true;
//The node is empty, indicating that the found head is not in the AQS queue, and returns false
if (t == null)
return false;
t = t.prev;
}
}

During traversal, if there is a node in the queue that is equal to the current node, return true; if the head node is found but not found, then return false.

We go back to the while loop of await, if it returns false, it means that the node is not in the synchronization queue, enter the loop and suspend the thread.

Knowledge point supplement

Ah Q’s understanding is that there are two situations in which a thread is awakened: one is to call signal/signalAll to wake up the thread; the other is to wake up the thread and throw an interrupt exception through the thread interrupt signal.

View checkInterruptWhileWaiting (difficulty)

The function of this method is to judge whether the current thread has been interrupted, if no interruption occurs, return 0, if an interruption occurs, return 1 or -1 .

private int checkInterruptWhileWaiting(Node node) {<!-- -->
return Thread. interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

Let’s see how the transferAfterCancelledWait method distinguishes between 1 and -1

final boolean transferAfterCancelledWait(Node node) {<!-- -->
//cas tries to set node's waitStatus to 0
if (compareAndSetWaitStatus(node, Node. CONDITION, 0)) {<!-- -->
//Add the node node from the waiting queue to the AQS queue
enq(node);
return true;
}
//After the cas fails, check whether the queue is already in the AQS queue, if not, give way to other threads through the yield method
while (!isOnSyncQueue(node))
Thread. yield();
    //If already in the AQS queue, return false
return false;
}

Under what circumstances will the cas operation succeed? Under what circumstances will it fail?

When the thread receives an interrupt signal, it will be woken up. At this time, the waitStatus=-2 of node, so the cas will succeed, and the cas will be successful at the same time. code>node is transferred from the waiting queue to the AQS queue.

When the thread wakes up via signal first and then receives an interrupt signal, because signal has set the waitStatus of node to – 2, so cas will fail at this time.

Example

You can use the example below to test the interrupt point in transferAfterCancelledWait, I believe it will be clear.

public class CarOperation {<!-- -->
//Create a reentrant lock
    private Lock lock = new ReentrantLock();

    //declare waiting queue
    Condition c1 = lock. newCondition();
\t
    /*
     * wait for action
     */
public void await() {<!-- -->
        lock. lock();
        try {<!-- -->
            System.out.println("Start blocking");
            c1. await();
            System.out.println("Continue to execute after waking up");
        } catch (InterruptedException e) {<!-- -->
            System.out.println("Wake up but throw an exception");
            e.printStackTrace();
        } finally {<!-- -->
            lock. unlock();
        }
    }

     /*
     * Wake up operation
     */
    public void signal() {<!-- -->
        lock. lock();
        try {<!-- -->
            c1. signal();
            System.out.println("waking up......");
        } finally {<!-- -->
            lock. unlock();
        }
    }
}

Break test

public static void main(String[] args) {<!-- -->
CarOperation carOperation = new CarOperation();
Thread t1 = new Thread(()->{<!-- -->
        //wait, suspend thread
carOperation. await();
});
t1. start();
try {<!-- -->
        //Simulate the execution process of other threads preempting resources
Thread. sleep(10000);
        //Send a thread interrupt signal
t1. interrupt();
} catch (InterruptedException exception) {<!-- -->
exception. printStackTrace();
}
}

Wake up before interrupting the test

public static void main(String[] args) {<!-- -->
    CarOperation carOperation = new CarOperation();
    Thread t1 = new Thread(()->{<!-- -->
        carOperation. await();
    });
    t1. start();
    try {<!-- -->
        Thread. sleep(10000);
        //Wake up the thread first
        carOperation. signal();
        // after interrupt
        t1. interrupt();
    } catch (InterruptedException exception) {<!-- -->
        exception. printStackTrace();
    }
}

View reportInterruptAfterWait

//Either throw an exception, or re-interrupt.
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {<!-- -->
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

The above is the whole content of await, let’s make a brief summary first.

Summary

  • Encapsulate the current thread into a node and add it to the tail of the waiting queue;
  • Completely release the lock resource, that is, remove its synchronization queue node from the head of the synchronization queue;
  • If the current node is not in the synchronization queue, suspend the current thread;
  • Spin until the thread is interrupted or woken up and moved to the synchronization queue;
  • Block the current node until it acquires the lock resource;

If you have any doubts, you can use the small window Ah Q!

signal

Next, let’s take a look at the process of awakening

public final void signal() {<!-- -->
    //Whether the current thread is the lock holder, if not, an exception is thrown
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
        //Specific wake-up process
doSignal(first);
}

private void doSignal(Node first) {<!-- -->
do {<!-- -->
        //Get the next node of the head node and assign it as the head node
if ( (firstWaiter = first. nextWaiter) == null)
lastWaiter = null;
        //Set the previous head node to empty
first. nextWaiter = null;
        //Transfer the head node from the waiting queue to the AQS queue, if the transfer fails, find the next node to continue the transfer
} while (!transferForSignal(first) & amp; & amp;
(first = firstWaiter) != null);
}

First take the head node of the waiting queue out of the waiting queue

Then execute the transferForSignal method to transfer

final boolean transferForSignal(Node node) {<!-- -->
//Set the waitStatus of node to 0. If the setting fails, it means that the node of node is no longer in the waiting queue, and returns false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//Transfer the node from the waiting queue to the AQS queue, and return the node's predecessor node
Node p = enq(node);
// Get the status of the node's predecessor node
int ws = p.waitStatus;
//If the node is in the cancel state or fails to set it to the wake-up state (indicating that it is already in the wake-up state), you can wake up the thread where the node node is located
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node. SIGNAL))
//wake up the current node
LockSupport.unpark(node.thread);
return true;
}

Transfer the head node of the waiting queue from the waiting queue to the AQS queue, if the transfer fails, it means that the node has been cancelled, return false directly, and then first points to the new head node to re-transfer. If the transfer is successful, it is judged whether to wake up the current thread directly according to the state of the predecessor node.

How about it? Isn’t the wake-up logic super simple? We also make a simple summary by example.

Summary

Starting from the head of the waiting queue, try to wake up the head node. If the node has been cancelled, try to wake up the next node.

When performing a wake-up operation on the first node, the node is first transferred to the synchronization queue. If the state of the predecessor node is cancelled or fails to set the state of the predecessor node to wake-up state, then Immediately wake up the thread corresponding to the current node, otherwise the wakeup operation will not be performed.

That’s all for today, see you next time. Begging for one button and three links, and the text is very tiring, don’t prostitute me for nothing, I need a little positive feedback. Click on the business card to contact me, and hope that in this cold city, we can warm each other.