What is SynchronousQueue
Some people say that it is a blocking queue with a capacity of 1. By reading the source code, we found that this explanation is not correct. The following is the explanation given by the author:
A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one.
It is a blocking queue, each insertion operation must wait for another thread’s removal operation, and vice versa. A blocking queue with no capacity, doesn’t even have a capacity. It means that when a production thread puts a piece of data into the queue, it must wait for a consumer thread to take the data away, otherwise the production thread will be blocked there. Queues do not hold any data.
It has two internal implementations, one is TransferStack (implemented by stack, stack implemented by one-way linked list, first in last out (LIFO)), and the other is TransferQueue (implemented by linked list, first in first out (FIFO)). Below we focus on one of them, and the other reader can check the source code by himself.
Let’s take a look at the internal default implementation, that is, the implementation of TransferStack: the internal class of SynchronousQueue
Introduction to TransferStack core variables
/** indicates that the current node is a consumer thread */ static final int REQUEST = 0; /** Indicates that the current node is a producer thread */ static final int DATA = 1; /** Indicates that the current node is a matching node thread */ static final int FULFILLING = 2; /** The head node of the stack */ volatile SNode head;
Introduction to SNode node variables
volatile SNode next; // Pointer to the next node volatile SNode match; // Indicates the current node matching status, if the match points to itself, it means the current node is canceled volatile Thread waiter; // Thread information of the current node Object item; // data to be exchanged int mode; // Indicates the type of node, whether it is a producer, a consumer or a match
Core method transfer method
The put and take methods will eventually enter the transfer method. The source code of the put and take methods is as follows
public void put(E e) throws InterruptedException {<!-- --> if (e == null) throw new NullPointerException(); // If null is returned, it means that it is interrupted, and an interrupt exception is thrown directly // If everything is normal, the producer will always block here until the data is taken by the consumer E transfer = transferer. transfer(e, false, 0); if (transfer == null) {<!-- --> Thread. interrupted(); throw new InterruptedException(); } } public E take() throws InterruptedException {<!-- --> // If there is no data to be exchanged in the queue, the consumer blocks and waits until a producer submits data E e = transferer. transfer(null, false, 0); if (e != null) return e; Thread. interrupted(); throw new InterruptedException(); }
transfer source code
The for(;;) loop ensures that the node is inserted into the queue or the data exchange operation is completed normally.
1. When the head node is empty, that is, you are the first node to enter the queue, or the mode of the current node and the top of the stack (head node) belong to the same mode, then insert the current node into the queue, and then block waiting for the matching thread match with
2. The current thread and the top of the stack belong to different modes, and the top of the stack has not been matched and exchanged by other threads, then try to perform a matching operation. Through the source code, we find that the matching node (current node) will be pushed into the stack in FULFILLING mode. At this time, the stack The data is shown in Figure 1 below
3. When another matching thread comes in, if the head node is in the matching state, then the current thread tries to help it complete the matching, update the head node, and then reacquire the latest head node by itself to try the next round of matching operations
E transfer(E e, boolean timed, long nanos) {<!-- --> SNodes = null; int mode = (e == null) ? REQUEST : DATA; // If e is NULL, it means a consumer thread, otherwise it is a production thread for (;;) {<!-- --> SNode h = head; // The head node is empty or the new node is the same pattern as the head node, insert it into the stack if (h == null || h.mode == mode) {<!-- --> if (timed & amp; & amp; nanos <= 0) {<!-- --> // if timed out, pop it off the stack if (h != null & amp; & amp; h.isCancelled()) casHead(h, h. next); else return null; } // When the head node is null, create the head node and assign it to s, then CAS updates the head node else if (casHead(h, s = snode(s, e, h, mode))) {<!-- --> SNode m = awaitFulfill(s, timed, nanos);// Wait for the matching thread to match and take the data, and the node to be matched is blocked in this step if (m == s) {<!-- --> // equality means that the node has been cancelled, just remove it from the stack clean(s); return null; } if ((h = head) != null & amp; & amp; h. next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m. item : s. item); } } // Execution to the if judgment shows that the current node and the head node are in different modes, and it is judged whether the head node is the node that is performing the matching operation, // Judging by the FULFILLING flag // If the head node is not a matching node, try to match it else if (!isFulfilling(h.mode)) {<!-- --> if (h.isCancelled()) // the node has been canceled pop it off the stack casHead(h, h.next); // Update the stack top pointer // Push the current node onto the stack in FULFILLING mode, indicating that the current node needs to be matched with the original head node else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {<!-- --> for (;;) {<!-- --> // loop until matched or waiters disappear SNode m = s.next; // m is s's match // Indicates that the original head node has been matched by other matching threads, you need to re-acquire the latest head node to match again if (m == null) {<!-- --> casHead(s, null); // pop s out s = null; // use new node next time break; // restart main loop } SNode mn = m.next; // The m node at this time is the original head node if (m. tryMatch(s)) {<!-- --> casHead(s, mn); // The match is successful Update the head node, skip the two nodes being matched return (E) ((mode == REQUEST) ? m. item : s. item); } else // lost match s.casNext(m, mn); // Matching failed, update the next node of s } } } // The head node is FULFILLING, helping it to complete the matching operation else {<!-- --> SNode m = h.next; // m is the matched node // m is null means that there is no matching node in the stack, pop the FULFILLING node from the stack // The next time you enter the loop, if the head node is empty, enter the queue and block waiting to be matched if (m == null) casHead(h, null); else {<!-- --> SNode mn = m.next; // Take out the next node of m to update the head if (m.tryMatch(h)) // try to help them match casHead(h, mn); // The match is successful, update the head node to mn, skip the first two matching nodes else // Matching failed, m has been matched by other threads, update the next of the head node to point to mn, and then re-execute the loop h.casNext(m, mn); // disconnect the next link of m } } } }
awaitFulfill method
Block and wait for the matching thread to match with it. Before entering the OS blocking waiting, try to perform a spin operation. If the node times out or is interrupted, directly cancel the node and exit the loop waiting. If it has not been matched during the spinning period, enter the OS blocking
SNode awaitFulfill(SNode s, boolean timed, long nanos) {<!-- --> // wait for timeout, calculate the absolute time final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread();// current thread int spins = (shouldSpin(s) ?//Is spin waiting required? 1. If s is the head node; 2. When the head node is empty; 3. The head node is being matched; (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) {<!-- --> if (w. isInterrupted()) s.tryCancel(); // If the node is interrupted, directly cancel the node, point the match to itself, and mark itself as canceled SNode m = s.match; if (m != null) // m may be itself (the node is canceled) or a matched node (the match is successful) return m; if (timed) {<!-- --> nanos = deadline - System.nanoTime(); if (nanos <= 0L) {<!-- --> s.tryCancel(); // timeout cancel node continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport. park(this); else if (nanos > spinForTimeoutThreshold) LockSupport. parkNanos(this, nanos); } }
tryMatch method
After the match is successful, point the match of the matched node to the s node, and wake up the matched node, as shown in the figure below:
boolean tryMatch(SNode s) {<!-- --> // match is not NULL means it has been matched // Point the match of the matched node (m) to the matching node (s) if (match == null & amp; & amp; UNSAFE. compareAndSwapObject(this, matchOffset, null, s)) {<!-- --> Thread w = waiter; if (w != null) {<!-- --> waiter = null; LockSupport. unpark(w); } return true; } return match == s; }
clean method
Remove the canceled node from the stack
void clean(SNode s) {<!-- --> s.item = null; s.waiter = null; SNode past = s. next; // Determine whether the next node is also canceled if (past != null & amp; & amp; past.isCancelled()) past = past. next; SNode p; // Traverse to find the uncancelled node, update the head to the uncancelled node while ((p = head) != null & amp; & amp; p != past & amp; & amp; p.isCancelled()) casHead(p, p. next); while (p != null & amp; & amp; p != past) {<!-- --> SNode n = p.next; if (n != null & & n.isCancelled()) p.casNext(n, n.next); else p = n; } }