Blocking queue SynchronousQueue

1 Brief description of SynchronousQueue

In this queue, data is not stored, but producers or consumers are stored. When a producer is stored in SynchronousQueue, the producer will block (according to the method called), and the producer will eventually have the following results:

  • If there is a consumer to match during the blocking period, the producer will hand over the bound message to the consumer

  • The producer waits until the blocking result, or does not allow blocking, and fails directly

  • The thread of the producer is interrupted during the blocking period, so leave directly

Consumers are the same as producers

Producer:

Consumer:

Same method as above

Sample code:

public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<String> synchronousQueue = new SynchronousQueue<String>();
        String msg ="Hello!";
        // producer
        new Thread(()->{
            boolean b = false;
            try {
                b = synchronousQueue. offer(msg,1, TimeUnit. SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(b);
        }).start();

        Thread. sleep(100);
        // consumer
        new Thread(()->{
            System.out.println(synchronousQueue.poll());
        }).start();

    }

Core inner classes:

 abstract static class Transferer<E> {
       // This method is the core method used by producers and consumers when calling read and write data
        abstract E transfer(E e, boolean timed, long nanos);
    }

When the producer calls the above method, the first parameter passes e, and when the consumer calls, the first parameter passes null

There are two implementations of Transferer:

  • TransferStack

  • TransferQueue

Which subclass implementation will be specified when constructing SynchronousQueue

public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    // If it is fair, use TransferQueue (queue structure, FIFO first in first out)
    // If it is unfair, use TransferStack (stack structure, first in last out)
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

2 TransferQueue source code analysis

Construct TransferQueue object

TransferQueue() {
    // Construct a queue whose head and tail nodes point to pseudo-nodes, and set the isData property to false
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

QNode(Object item, boolean isData) {
    this.item = item;
    // Distinguish between consumer and producer
    this.isData = isData;
}

put method source code analysis

public void put(E e) throws InterruptedException {
    // Non-empty judgment throws an exception
    if (e == null) throw new NullPointerException();
    // If it is null, it is caused by thread interruption, and an exception is thrown directly
    if (transferer. transfer(e, false, 0) == null) {
        Thread. interrupted();
        throw new InterruptedException();
    }
}

offer method source code analysis

public boolean offer(E e) {
    // Non-empty judgment throws an exception
    if (e == null) throw new NullPointerException();
    // Return false, indicating that the thread is interrupted
    return transferer. transfer(e, true, 0) != null;
}

offer(E e, long timeout, TimeUnit unit) source code analysis

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // Non-empty judgment throws an exception
    if (e == null) throw new NullPointerException();
    // If it is null, it is due to timeout or thread interruption, otherwise return true directly
    if (transferer. transfer(e, true, unit. toNanos(timeout)) != null)
        return true;
    // Check the interrupt flag bit of the thread, if it is not interrupted, it is timed out, return false, otherwise an interrupt exception is thrown
    if (!Thread. interrupted())
        return false;
    throw new InterruptedException();
}

Transfer core method source code analysis

// core method implementation
E transfer(E e, boolean timed, long nanos) {
    // Construct producer or consumer object
    QNode s = null; // constructed/reused as needed
    // true means producer, otherwise means consumer
    boolean isData = (e != null);
    // Because there is no locking operation, use an infinite loop to make a lot of judgments to avoid concurrency problems
    for (;;) {
        // Assign the member variables of the head and tail nodes to local variables
        QNode t = tail;
        QNode h = head;
        // Made a robustness judgment, if it is null, it means that TransferQueue has not been initialized (order rearrangement may have occurred)
        if (t == null || h == null) // saw uninitialized value
            continue;
        // The head node is equal to the tail node: the entire queue has no producers and no consumers
        // If there is a node, the current node and the queue node belong to the same role
        // Both of the above cases enter the queue
        if (h == t || t.isData == isData) {
            // Get the next node of the head node
            QNode tn = t.next;
            //The head node is not equal to the tail node, indicating that there is a concurrency problem, and go through the for loop again
            if (t != tail) // inconsistent read
                continue;
            // The back node of the head node is not null, indicating that there are concurrent threads and a node is added
            if (tn != null) { // lagging tail
                // Directly help that thread to modify the tail pointer and continue to execute the for loop
                advanceTail(t, tn);
                continue;
            }
            // timed: false wireless blocking, true blocking for a period of time
            // If timed is true and the blocking time is less than or equal to 0, return null
            if (timed & amp; & amp; nanos <= 0L) // can't wait
                return null;
            // If it can be blocked, build the current element e as a QNode node and set it as a producer
            if (s == null)
                s = new QNode(e, isData);
            // Based on the CAS operation, set the next of the tail node from null to the current QNode
            if (!t.casNext(null, s)) // failed to link in
                // If it comes in, it means that the modification failed, and re-execute the for training
                continue;
            // CAS operation is successful, replace the point of tail
            advanceTail(t, s); // swing tail and wait
            // If it enters the queue, suspend the thread and wait for the producer or consumer
            // x is to return the replaced data
            Object x = awaitFulfill(s, e, timed, nanos);

            // Indicates that the node is canceled
            if (x == s) { // wait was canceled
                // Clear the current node, skip the current node
                clean(t, s);
                return null;
            }
            // Determine whether the current node is still in the queue
            if (!s.isOffList()) {
                // set current node as head
                advanceHead(t, s); // unlink if head
                   // If I get the data, it means I am a consumer
                if (x != null) // and forget fields
                    // Set the item of the current node to itself
                    s.item = s;
                // empty thread
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {
            // Match the characters in the queue
            // head's next is the role to be matched // complementary-mode
            QNode m = h.next; // node to fulfill
            // Do concurrent judgment, if the head node, the next node of the head node, and the tail node change, re-execute the for loop
            if (t != tail || m == null || h != head)
                continue; // inconsistent read
            // There is no concurrency problem, you can get the data
            Object x = m. item;
            //If (isData == (x != null) is satisfied, it means that there is a concurrency problem, and it is unreasonable for consumers to match consumers
            if (isData == (x != null) ||
                // Indicates that the node is canceled and points to itself
                x == m ||
                // If none of the above is satisfied, it means that data can be exchanged by CAS
                // If the exchange fails, there is a concurrency problem
                // At this point, you need to reset the head node and continue to execute the for loop
                !m.casItem(x, e)) { // lost CAS
                advanceHead(h, m); // dequeue and retry
                continue;
            }
            // replace head
            advanceHead(h, m);
            // Wake up the thread in head.next
            LockSupport.unpark(m.waiter);
            // The matching is complete, the data is also exchanged, return directly
            // If x is not equal to null, it means that the queue is a producer and currently a consumer, and directly returns the specific data of x;
            // Otherwise, the queue is a consumer, currently a producer, and returns its own data directly
            return (x != null) ? (E)x : e;
        }
    }
}

transfer method flow chart