Hello everyone, I am Ah Q!
20 pictures illustrating the principle of ReentrantLock locking and unlocking Once the article was published, it triggered a heated discussion, and some friends came to the pop-up window: usually adding and unlocking is directly implemented by using the Synchronized
keyword , easy to use, why use ReentrantLock
?
In order to solve the doubts of the friends, let’s make a simple comparison between the two:
Similar
Both are “reentrant locks”, that is, after the current thread acquires the lock object, if you want to continue to acquire the lock object, you can still continue to acquire it, but the counter of the lock object can be “+ 1”.
Differences
ReentrantLock
is implemented based onAPI
, andSynchronized
is implemented depending onJVM
;ReentrantLock
can respond to interrupts, butSynchronized
cannot;ReentrantLock
can specify whether it is a fair lock or an unfair lock, whileSynchronized
can only be an unfair lock;- The
lock
ofReentrantLock
is synchronous and non-blocking, using an optimistic concurrency strategy, whileSynchronized
is synchronously blocking, using a pessimistic concurrency strategy; ReentrantLock
can implement multi-channel selection notification withCondition
,Synchronized
throughwait()
andnotify() The /notifyAll()
method can implement the waiting/notification mechanism (one-way notification);
To sum up, ReentrantLock
still has different usage scenarios from Synchronized
. Today we will talk about its multi-channel selection notification function.
Actual combat
“On paper” without actual combat is all nonsense. Today we do the opposite and throw out the actual combat Demo
first.
Scene description
In order to attract more car owners to come to refuel, the gas station has put an automatic car washing machine in the gas station to provide free car washing services for refueled cars. We stipulate that cars must be refueled according to the process of “refueling -> car washing -> driving away”, and the next car is allowed to come in for refueling after the previous car has left.
Code Implementation
First create a lock object and generate three Condition
/** * Flag to control thread wakeup */ private int flag = 1; /** * Create lock object */ private Lock lock = new ReentrantLock(); /** * waiting queue * c1 corresponds to refueling * c2 corresponds to car washing * c3 corresponds to driving */ Condition c1 = lock. newCondition(); Condition c2 = lock. newCondition(); Condition c3 = lock. newCondition();
Then declare the method of refueling, washing and driving away, and stipulate that after refueling, go to the car wash and drive away from the gas station
/** * Car refueling */ public void fuelUp(int num) {<!-- --> lock. lock(); try {<!-- --> while (flag!=1){<!-- --> c1. await(); } System.out.println("No." + num + "The car starts to refuel"); flag = 2; c2. signal(); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } finally {<!-- --> lock. unlock(); } } /** * car wash */ public void carWash(int num) {<!-- --> lock. lock(); try {<!-- --> while (flag!=2){<!-- --> c2. await(); } System.out.println("No." + num + "The car starts cleaning"); flag = 3; c3.signal(); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } finally {<!-- --> lock. unlock(); } } /** * drive away */ public void drive(int num) {<!-- --> lock. lock(); try {<!-- --> while (flag!=3){<!-- --> c3. await(); } System.out.println("No." + num + "The car has left the gas station"); flag = 1; c1. signal(); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } finally {<!-- --> lock. unlock(); } }
Among them,
await
is the waiting method, andsignal
is the wake-up method.
Finally, let’s define the main
method to simulate the scene where 3 cars arrive at the gas station at the same time
public static void main(String[] args) {<!-- --> CarOperation carOperation = new CarOperation(); // car refueling new Thread(()->{<!-- --> for (int i = 1; i < 4; i ++ ) {<!-- --> carOperation. fuelUp(i); } },"fuelUp").start(); //car wash new Thread(()->{<!-- --> for (int i = 1; i < 4; i ++ ) {<!-- --> carOperation. carWash(i); } },"carRepair").start(); // drive away new Thread(()->{<!-- --> for (int i = 1; i < 4; i ++ ) {<!-- --> carOperation. drive(i); } },"drive").start(); }
Is it smooth to use? In order to deepen everyone’s understanding of Condition
, let’s analyze the principle of a wave of Condition
graphically~
Illustration
Everyone has seen that the above cases are all operated around Condition
, so what is Condition
? Condition
is an interface, which defines the methods of thread waiting and waking up.
The lock.newCondition()
called in the code actually calls the newCondition
method in the Sync
class, and the ConditionObject
It is the implementation class of Condition
.
final ConditionObject newCondition() {<!-- --> return new ConditionObject(); }
We found that it is inside AQS
and cannot be instantiated directly, so it needs to be used with ReentrantLock
.
ConditionObject
ConditionObject
internally maintains a FIFO
one-way queue based on Node
, which we call waiting queue. firstWaiter
points to the first node, lastWaiter
points to the tail node, nextWaiter
in Node
points to the next element in the queue, And the waitStatus
of the nodes in the waiting queue are all -2.
After understanding the data structure of ConditionObject
, let’s illustrate the waiting/wakeup mechanism of ReentrantLock
from the perspective of source code.
await
First find the source code of await
in the AQS
class
public final void await() throws InterruptedException {<!-- --> if (Thread. interrupted()) throw new InterruptedException(); //Encapsulate the current thread into a node and add it to the tail of the waiting queue Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; //Detect whether the thread of this node is on the synchronization queue, if not, it means that the thread is not yet qualified to compete for the lock, then continue to wait until it is detected that this node is on the synchronization queue while (!isOnSyncQueue(node)) {<!-- --> //When node is in the waiting queue, suspend the current thread. LockSupport. park(this); //If an interrupt occurs, jump out of the loop and end the wait if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //After being awakened, the node must be on the AQS queue, //Analyzed before that the acquireQueued method will continue to block if the lock cannot be obtained //Get the lock, return true if interrupted, return false if not interrupted //A thread that acquires a lock that is interrupted and not awakened by an interrupt sets the interrupt mode to re-interrupt if (acquireQueued(node, savedState) & amp; & amp; interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node. nextWaiter != null) // clean up if canceled //Clear all nodes whose status is not CONDITION in the condition queue unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
If the thread is interrupted, the interrupt flag is cleared and an exception is thrown.
View addConditionWaiter
The function of this method is to encapsulate the current thread into a node
and add it to the tail of the waiting queue
private Node addConditionWaiter() {<!-- --> Node t = lastWaiter; if (t != null & amp; & amp; t.waitStatus != Node.CONDITION) {<!-- --> //Remove nodes that are not in the waiting state from the waiting queue unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread. currentThread(), Node. CONDITION); // tail node is empty if (t == null) // point the first node to node firstWaiter = node; else //Point the nextWaiter of the tail node to the node node t.nextWaiter = node; //The tail node points to node lastWaiter = node; return node; }
First point t to the tail node, if the tail node is not empty and its waitStatus!=-2
, then remove the node that is not in the waiting state from the waiting queue, and point t to the new end node of .
Encapsulate the current thread into a node whose waitStatus
is -2 and append it to the end of the waiting queue.
If the tail node is empty, the queue is empty, and both the head and tail nodes point to the current node.
If the tail node is not empty, it proves that there are other nodes in the queue, then point the nextWaiter
of the current tail node to the current node, and set the current node as the tail node.
Then let’s take a look at the unlinkCancelledWaiters()
method – remove the nodes that are not in the waiting state from the waiting queue.
private void unlinkCancelledWaiters() {<!-- --> Node t = firstWaiter; //trail is the predecessor node of t Node trail = null; while (t != null) {<!-- --> //next is the successor node of t Node next = t.nextWaiter; //If the waitStatus of the t node is not -2, the node is invalid if (t.waitStatus != Node.CONDITION) {<!-- --> t.nextWaiter = null; //If the predecessor node of t is empty, point the first node to next if (trail == null) firstWaiter = next; else //The predecessor node of t is not empty, and the successor pointer of the predecessor node points to next trail.nextWaiter = next; //If next is null, point the tail node to the predecessor node of t if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
t is the current node, trail
is the predecessor node of t, and next
is the successor node of t.
The while
method will follow the waiting queue from the first node to find the node with waitStatus!=-2
, and set the nextWaiter
of the current node to empty .
If the predecessor node of the current node is empty, it means that the current node is the first node, then set next as the first node;
If it is not empty, point the nextWaiter
of the predecessor node to the successor node.
If the successor node is empty, directly set the predecessor node as the tail node.
View fullyRelease
It can be almost understood from the name that the function of this method is to completely release the lock resource.
final int fullyRelease(Node node) {<!-- --> //The failure to release the lock is true, and the success of the lock release is false boolean failed = true; try {<!-- --> // Get the state of the current lock int savedState = getState(); //If the lock is released successfully if (release(savedState)) {<!-- --> failed = false; return savedState; } else {<!-- --> throw new IllegalMonitorStateException(); } } finally {<!-- --> if (failed) //If the release of the lock fails, the node status will be set to cancel node.waitStatus = Node.CANCELLED; } }
The most important thing is the release
method, and as we have said above, if release
is executed successfully, the current thread has released the lock resource.
View isOnSyncQueue
Determine whether the Node
where the current thread is located is in the synchronization queue (the synchronization queue is the AQS
queue). Here it is necessary to show you the relationship diagram between the synchronization queue and the waiting queue.
final boolean isOnSyncQueue(Node node) {<!-- --> if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node. next != null) return true; //The next of the node node is null return findNodeFromTail(node); }
If waitStatus=-2
of the current node means it is in the waiting queue, return false
; if the current node has a predecessor node, it proves that it is in AQS
code> queue, but the predecessor node is empty, indicating that it is the head node, and the head node does not participate in lock competition, and returns false
.
If the current node is neither in the waiting queue nor the head node in AQS
and there is a next
node, it means it exists in AQS
, Return true
directly.
Then look down, if the next
of the current node is empty, the node may be a tail
node, or the next
of the node has not yet Assignment, so nodes need to be traversed from back to front.
private boolean findNodeFromTail(Node node) {<!-- --> Node t = tail; for (;;) {<!-- --> //First use the tail node to judge, and then use the nodes in the queue to judge in turn if (t == node) return true; //The node is empty, indicating that the found head is not in the AQS queue, and returns false if (t == null) return false; t = t.prev; } }
During traversal, if there is a node in the queue that is equal to the current node, return true
; if the head node is found but not found, then return false
.
We go back to the while
loop of await
, if it returns false
, it means that the node is not in the synchronization queue, enter the loop and suspend the thread.
Knowledge point supplement
Ah Q’s understanding is that there are two situations in which a thread is awakened: one is to call signal/signalAll
to wake up the thread; the other is to wake up the thread and throw an interrupt exception through the thread interrupt signal.
View checkInterruptWhileWaiting (difficulty)
The function of this method is to judge whether the current thread has been interrupted, if no interruption occurs, return 0
, if an interruption occurs, return 1
or -1
.
private int checkInterruptWhileWaiting(Node node) {<!-- --> return Thread. interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
Let’s see how the transferAfterCancelledWait
method distinguishes between 1
and -1
final boolean transferAfterCancelledWait(Node node) {<!-- --> //cas tries to set node's waitStatus to 0 if (compareAndSetWaitStatus(node, Node. CONDITION, 0)) {<!-- --> //Add the node node from the waiting queue to the AQS queue enq(node); return true; } //After the cas fails, check whether the queue is already in the AQS queue, if not, give way to other threads through the yield method while (!isOnSyncQueue(node)) Thread. yield(); //If already in the AQS queue, return false return false; }
Under what circumstances will the cas
operation succeed? Under what circumstances will it fail?
When the thread receives an interrupt signal, it will be woken up. At this time, the waitStatus=-2
of node
, so the cas
will succeed, and the cas
will be successful at the same time. code>node is transferred from the waiting queue to the AQS
queue.
When the thread wakes up via signal
first and then receives an interrupt signal, because signal
has set the waitStatus
of node
to – 2, so cas
will fail at this time.
Example
You can use the example below to test the interrupt point in transferAfterCancelledWait
, I believe it will be clear.
public class CarOperation {<!-- --> //Create a reentrant lock private Lock lock = new ReentrantLock(); //declare waiting queue Condition c1 = lock. newCondition(); \t /* * wait for action */ public void await() {<!-- --> lock. lock(); try {<!-- --> System.out.println("Start blocking"); c1. await(); System.out.println("Continue to execute after waking up"); } catch (InterruptedException e) {<!-- --> System.out.println("Wake up but throw an exception"); e.printStackTrace(); } finally {<!-- --> lock. unlock(); } } /* * Wake up operation */ public void signal() {<!-- --> lock. lock(); try {<!-- --> c1. signal(); System.out.println("waking up......"); } finally {<!-- --> lock. unlock(); } } }
Break test
public static void main(String[] args) {<!-- --> CarOperation carOperation = new CarOperation(); Thread t1 = new Thread(()->{<!-- --> //wait, suspend thread carOperation. await(); }); t1. start(); try {<!-- --> //Simulate the execution process of other threads preempting resources Thread. sleep(10000); //Send a thread interrupt signal t1. interrupt(); } catch (InterruptedException exception) {<!-- --> exception. printStackTrace(); } }
Wake up before interrupting the test
public static void main(String[] args) {<!-- --> CarOperation carOperation = new CarOperation(); Thread t1 = new Thread(()->{<!-- --> carOperation. await(); }); t1. start(); try {<!-- --> Thread. sleep(10000); //Wake up the thread first carOperation. signal(); // after interrupt t1. interrupt(); } catch (InterruptedException exception) {<!-- --> exception. printStackTrace(); } }
View reportInterruptAfterWait
//Either throw an exception, or re-interrupt. private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {<!-- --> if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
The above is the whole content of await
, let’s make a brief summary first.
Summary
- Encapsulate the current thread into a
node
and add it to the tail of the waiting queue; - Completely release the lock resource, that is, remove its synchronization queue node from the head of the synchronization queue;
- If the current node is not in the synchronization queue, suspend the current thread;
- Spin until the thread is interrupted or woken up and moved to the synchronization queue;
- Block the current node until it acquires the lock resource;
If you have any doubts, you can use the small window Ah Q!
signal
Next, let’s take a look at the process of awakening
public final void signal() {<!-- --> //Whether the current thread is the lock holder, if not, an exception is thrown if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //Specific wake-up process doSignal(first); } private void doSignal(Node first) {<!-- --> do {<!-- --> //Get the next node of the head node and assign it as the head node if ( (firstWaiter = first. nextWaiter) == null) lastWaiter = null; //Set the previous head node to empty first. nextWaiter = null; //Transfer the head node from the waiting queue to the AQS queue, if the transfer fails, find the next node to continue the transfer } while (!transferForSignal(first) & amp; & amp; (first = firstWaiter) != null); }
First take the head node of the waiting queue out of the waiting queue
Then execute the transferForSignal
method to transfer
final boolean transferForSignal(Node node) {<!-- --> //Set the waitStatus of node to 0. If the setting fails, it means that the node of node is no longer in the waiting queue, and returns false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //Transfer the node from the waiting queue to the AQS queue, and return the node's predecessor node Node p = enq(node); // Get the status of the node's predecessor node int ws = p.waitStatus; //If the node is in the cancel state or fails to set it to the wake-up state (indicating that it is already in the wake-up state), you can wake up the thread where the node node is located if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node. SIGNAL)) //wake up the current node LockSupport.unpark(node.thread); return true; }
Transfer the head node of the waiting queue from the waiting queue to the AQS
queue, if the transfer fails, it means that the node has been cancelled, return false
directly, and then first
points to the new head node to re-transfer. If the transfer is successful, it is judged whether to wake up the current thread directly according to the state of the predecessor node.
How about it? Isn’t the wake-up logic super simple? We also make a simple summary by example.
Summary
Starting from the head of the waiting queue, try to wake up the head node. If the node has been cancelled, try to wake up the next node.
When performing a wake-up operation on the first node, the node is first transferred to the synchronization queue. If the state of the predecessor node is cancelled or fails to set the state of the predecessor node to wake-up state, then Immediately wake up the thread corresponding to the current node, otherwise the wakeup operation will not be performed.
That’s all for today, see you next time. Begging for one button and three links, and the text is very tiring, don’t prostitute me for nothing, I need a little positive feedback. Click on the business card to contact me, and hope that in this cold city, we can warm each other.