-
AbstractQueuedSynchronizer (AQS), the subclass is Sync, and the subclass of Sync has two implementations: NonfairSync and FairSync.
-
lock()
-
.lock() of NofairSync object
-
final void lock() {<!-- --> //state0 represents not locked //1 represents locking if (compareAndSetState(0, 1)) //The first cas lock is successful setExclusiveOwnerThread(Thread.currentThread()); //Set the thread in the lock to the current thread else acquire(1); //The first time cas locking fails, enter acquire(1) of aqs }
-
//AQS method public final void acquire(int arg) {<!-- --> //The first time arg=1 comes in here if (!tryAcquire(arg) & amp; & amp; //The following method returns true if the lock is successful, here!tryAcquire(arg) returns false acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
-
//NofairSync method protected final boolean tryAcquire(int acquires) {<!-- --> return nonfairTryAcquire(acquires); } --- final boolean nonfairTryAcquire(int acquires) {<!-- --> final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {<!-- --> if (compareAndSetState(0, acquires)) {<!-- --> //Try cas lock for the second time and return successfully (it may also be that you wake up in AQS and try to lock later, the following acquireQueued(final Node node, int arg) method) setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {<!-- --> //state is 1 and the lock holding thread is the current thread, which means reentrancy int nextc = c + acquires; //reentrancy count plus 1 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; //reentry successful return } return false; //The second cas lock failed, the above!tryAcquire(arg) is true }
-
private Node addWaiter(Node mode) {<!-- --> \t Node node = new Node(Thread.currentThread(), mode); /*Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; //null in EXCLUSIVE state this.thread = thread; //Current thread }*/ \t\t // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {<!-- --> //The tail node is not empty, try to set the current node as the tail node node.prev = pred; if (compareAndSetTail(pred, node)) {<!-- --> pred.next = node; return node; } } enq(node);//When it comes out, ensure that the current node has been added to AQS. Assuming that only one thread is waiting at this time, there should be a sentinel node and the current node in the queue (the sentinel node is null and the current node is EXCLUSIVE) return node;//Return the current node } private Node enq(final Node node) {<!-- --> for (;;) {<!-- --> Node t = tail; if (t == null) {<!-- --> // Must initialize if (compareAndSetHead(new Node())) //In the addWaiter method, if the tail node is empty, it means that the AQS queue is empty. Try to do one thing: //First add an empty node (sentinel node) to the current AQS, and then enter the loop. The next time it must be in else. tail = head; } else {<!-- --> node.prev = t; if (compareAndSetTail(t, node)) {<!-- --> //There are already other nodes in the AQS queue (maybe there is only one sentinel, or there may be a long queue), add the current node to the queue tail t.next = node; return t; } } } }
//AQS method //The second time cas locking fails, enter acquireQueued(addWaiter(Node.EXCLUSIVE), arg) final boolean acquireQueued(final Node node, int arg) {<!-- -->//node is the current thread node, arg=1 boolean failed = true; try {<!-- --> boolean interrupted = false; for (;;) {<!-- --> final Node p = node.predecessor(); //Get the predecessor node of the current node (it is a sentinel when there is only one blocking node) if (p == head & amp; & amp; tryAcquire(arg)) {<!-- --> //If the previous node is a sentinel & amp; & amp; if the lock attempt is successful, leave the queue setHead(node); p.next = null; // help GC failed = false; return interrupted; //Return whether it was interrupted } if (shouldParkAfterFailedAcquire(p, node) & amp; & amp; //Determine whether it should be blocked parkAndCheckInterrupt()) //Use LockSupport.park() to block here interrupted = true; //If park() in parkAndCheckInterrupt() is found to be interrupted after it ends, set the interrupt mark to true } } finally {<!-- --> if (failed) //Failed for a special reason (if the lock is successfully acquired in the first if above, the following cancelAcquire(node) will not be executed) cancelAcquire(node); } } //Check and update the status of failed nodes. If the thread is blocked, returns true. This is the primary signal control in all acquisition loops. requires pred == node.prev private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {<!-- --> int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can park safely. */ //When the status of the current driver node is SIGNAL, it means that the current thread can park return true; if (ws > 0) {<!-- --> /* * Predecessor was canceled. Skip over predecessors and * indicate retry. */ //When the predecessor node waitStatus(ws) is greater than 0, it means it has been canceled and jumps directly to a node not greater than 0. do {<!-- --> node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {<!-- --> /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //Try to set the predecessor node status to SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //How to actually block the thread private final boolean parkAndCheckInterrupt() {<!-- --> LockSupport.park(this); return Thread.interrupted(); }
//Node status constants within AQS /** Marker to indicate a node is waiting in shared mode */ //Mark to indicate that the node is waiting in shared mode static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ //Mark indicating that the node is waiting in exclusive mode static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has canceled */ //waitStatus value indicates that the thread has been canceled static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ //waitStatus value indicates that subsequent threads need to be unparked static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ //waitStatus value indicates that the thread is waiting for a certain condition static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ //waitStatus value, indicating that the next obtained one should be propagated unconditionally static final int PROPAGATE = -3;
-
-
unlock()
-
.release(1) of AQS object;
-
public final boolean release(int arg) {<!-- --> if (tryRelease(arg)) {<!-- --> //Try to release the lock Node h = head; if (h != null & amp; & amp; h.waitStatus != 0) unparkSuccessor(h); //Wake up in AQS return true; } return false; }
-
protected final boolean tryRelease(int releases) {<!-- --> //releases=1 int c = getState() - releases; //Reentrancy may occur if (Thread.currentThread() != getExclusiveOwnerThread()) //It is not the lock of the current thread and throws an exception throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {<!-- --> //No reentrancy, release free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
-
private void unparkSuccessor(Node node) {<!-- --> /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signaling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if canceled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) {<!-- --> s = null; for (Node t = tail; t != null & amp; & amp; t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }