1 Brief description of SynchronousQueue
In this queue, data is not stored, but producers or consumers are stored. When a producer is stored in SynchronousQueue, the producer will block (according to the method called), and the producer will eventually have the following results:
-
If there is a consumer to match during the blocking period, the producer will hand over the bound message to the consumer
-
The producer waits until the blocking result, or does not allow blocking, and fails directly
-
The thread of the producer is interrupted during the blocking period, so leave directly
Consumers are the same as producers
Producer:
Consumer:
Same method as above
Sample code:
public static void main(String[] args) throws InterruptedException { SynchronousQueue<String> synchronousQueue = new SynchronousQueue<String>(); String msg ="Hello!"; // producer new Thread(()->{ boolean b = false; try { b = synchronousQueue. offer(msg,1, TimeUnit. SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(b); }).start(); Thread. sleep(100); // consumer new Thread(()->{ System.out.println(synchronousQueue.poll()); }).start(); }
Core inner classes:
abstract static class Transferer<E> { // This method is the core method used by producers and consumers when calling read and write data abstract E transfer(E e, boolean timed, long nanos); }
When the producer calls the above method, the first parameter passes e, and when the consumer calls, the first parameter passes null
There are two implementations of Transferer:
-
TransferStack
-
TransferQueue
Which subclass implementation will be specified when constructing SynchronousQueue
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { // If it is fair, use TransferQueue (queue structure, FIFO first in first out) // If it is unfair, use TransferStack (stack structure, first in last out) transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
2 TransferQueue source code analysis
Construct TransferQueue object
TransferQueue() { // Construct a queue whose head and tail nodes point to pseudo-nodes, and set the isData property to false QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } QNode(Object item, boolean isData) { this.item = item; // Distinguish between consumer and producer this.isData = isData; }
put method source code analysis
public void put(E e) throws InterruptedException { // Non-empty judgment throws an exception if (e == null) throw new NullPointerException(); // If it is null, it is caused by thread interruption, and an exception is thrown directly if (transferer. transfer(e, false, 0) == null) { Thread. interrupted(); throw new InterruptedException(); } }
offer method source code analysis
public boolean offer(E e) { // Non-empty judgment throws an exception if (e == null) throw new NullPointerException(); // Return false, indicating that the thread is interrupted return transferer. transfer(e, true, 0) != null; }
offer(E e, long timeout, TimeUnit unit) source code analysis
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // Non-empty judgment throws an exception if (e == null) throw new NullPointerException(); // If it is null, it is due to timeout or thread interruption, otherwise return true directly if (transferer. transfer(e, true, unit. toNanos(timeout)) != null) return true; // Check the interrupt flag bit of the thread, if it is not interrupted, it is timed out, return false, otherwise an interrupt exception is thrown if (!Thread. interrupted()) return false; throw new InterruptedException(); }
Transfer core method source code analysis
// core method implementation E transfer(E e, boolean timed, long nanos) { // Construct producer or consumer object QNode s = null; // constructed/reused as needed // true means producer, otherwise means consumer boolean isData = (e != null); // Because there is no locking operation, use an infinite loop to make a lot of judgments to avoid concurrency problems for (;;) { // Assign the member variables of the head and tail nodes to local variables QNode t = tail; QNode h = head; // Made a robustness judgment, if it is null, it means that TransferQueue has not been initialized (order rearrangement may have occurred) if (t == null || h == null) // saw uninitialized value continue; // The head node is equal to the tail node: the entire queue has no producers and no consumers // If there is a node, the current node and the queue node belong to the same role // Both of the above cases enter the queue if (h == t || t.isData == isData) { // Get the next node of the head node QNode tn = t.next; //The head node is not equal to the tail node, indicating that there is a concurrency problem, and go through the for loop again if (t != tail) // inconsistent read continue; // The back node of the head node is not null, indicating that there are concurrent threads and a node is added if (tn != null) { // lagging tail // Directly help that thread to modify the tail pointer and continue to execute the for loop advanceTail(t, tn); continue; } // timed: false wireless blocking, true blocking for a period of time // If timed is true and the blocking time is less than or equal to 0, return null if (timed & amp; & amp; nanos <= 0L) // can't wait return null; // If it can be blocked, build the current element e as a QNode node and set it as a producer if (s == null) s = new QNode(e, isData); // Based on the CAS operation, set the next of the tail node from null to the current QNode if (!t.casNext(null, s)) // failed to link in // If it comes in, it means that the modification failed, and re-execute the for training continue; // CAS operation is successful, replace the point of tail advanceTail(t, s); // swing tail and wait // If it enters the queue, suspend the thread and wait for the producer or consumer // x is to return the replaced data Object x = awaitFulfill(s, e, timed, nanos); // Indicates that the node is canceled if (x == s) { // wait was canceled // Clear the current node, skip the current node clean(t, s); return null; } // Determine whether the current node is still in the queue if (!s.isOffList()) { // set current node as head advanceHead(t, s); // unlink if head // If I get the data, it means I am a consumer if (x != null) // and forget fields // Set the item of the current node to itself s.item = s; // empty thread s.waiter = null; } return (x != null) ? (E)x : e; } else { // Match the characters in the queue // head's next is the role to be matched // complementary-mode QNode m = h.next; // node to fulfill // Do concurrent judgment, if the head node, the next node of the head node, and the tail node change, re-execute the for loop if (t != tail || m == null || h != head) continue; // inconsistent read // There is no concurrency problem, you can get the data Object x = m. item; //If (isData == (x != null) is satisfied, it means that there is a concurrency problem, and it is unreasonable for consumers to match consumers if (isData == (x != null) || // Indicates that the node is canceled and points to itself x == m || // If none of the above is satisfied, it means that data can be exchanged by CAS // If the exchange fails, there is a concurrency problem // At this point, you need to reset the head node and continue to execute the for loop !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } // replace head advanceHead(h, m); // Wake up the thread in head.next LockSupport.unpark(m.waiter); // The matching is complete, the data is also exchanged, return directly // If x is not equal to null, it means that the queue is a producer and currently a consumer, and directly returns the specific data of x; // Otherwise, the queue is a consumer, currently a producer, and returns its own data directly return (x != null) ? (E)x : e; } } }
transfer method flow chart