SynchronousQueue of JUC source code series

What is SynchronousQueue

Some people say that it is a blocking queue with a capacity of 1. By reading the source code, we found that this explanation is not correct. The following is the explanation given by the author:

A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one.

It is a blocking queue, each insertion operation must wait for another thread’s removal operation, and vice versa. A blocking queue with no capacity, doesn’t even have a capacity. It means that when a production thread puts a piece of data into the queue, it must wait for a consumer thread to take the data away, otherwise the production thread will be blocked there. Queues do not hold any data.

It has two internal implementations, one is TransferStack (implemented by stack, stack implemented by one-way linked list, first in last out (LIFO)), and the other is TransferQueue (implemented by linked list, first in first out (FIFO)). Below we focus on one of them, and the other reader can check the source code by himself.

Let’s take a look at the internal default implementation, that is, the implementation of TransferStack: the internal class of SynchronousQueue

Introduction to TransferStack core variables

/** indicates that the current node is a consumer thread */
static final int REQUEST = 0;
/** Indicates that the current node is a producer thread */
static final int DATA = 1;
/** Indicates that the current node is a matching node thread */
static final int FULFILLING = 2;
/** The head node of the stack */
volatile SNode head;

Introduction to SNode node variables

volatile SNode next; // Pointer to the next node
volatile SNode match; // Indicates the current node matching status, if the match points to itself, it means the current node is canceled
volatile Thread waiter; // Thread information of the current node
Object item; // data to be exchanged
int mode; // Indicates the type of node, whether it is a producer, a consumer or a match

Core method transfer method

The put and take methods will eventually enter the transfer method. The source code of the put and take methods is as follows

public void put(E e) throws InterruptedException {<!-- -->
    if (e == null) throw new NullPointerException();
    // If null is returned, it means that it is interrupted, and an interrupt exception is thrown directly
    // If everything is normal, the producer will always block here until the data is taken by the consumer
    E transfer = transferer. transfer(e, false, 0);
    if (transfer == null) {<!-- -->
        Thread. interrupted();
        throw new InterruptedException();
    }
}

public E take() throws InterruptedException {<!-- -->
    // If there is no data to be exchanged in the queue, the consumer blocks and waits until a producer submits data
    E e = transferer. transfer(null, false, 0);
    if (e != null)
        return e;
    Thread. interrupted();
    throw new InterruptedException();
}

transfer source code

The for(;;) loop ensures that the node is inserted into the queue or the data exchange operation is completed normally.

1. When the head node is empty, that is, you are the first node to enter the queue, or the mode of the current node and the top of the stack (head node) belong to the same mode, then insert the current node into the queue, and then block waiting for the matching thread match with

2. The current thread and the top of the stack belong to different modes, and the top of the stack has not been matched and exchanged by other threads, then try to perform a matching operation. Through the source code, we find that the matching node (current node) will be pushed into the stack in FULFILLING mode. At this time, the stack The data is shown in Figure 1 below

3. When another matching thread comes in, if the head node is in the matching state, then the current thread tries to help it complete the matching, update the head node, and then reacquire the latest head node by itself to try the next round of matching operations

E transfer(E e, boolean timed, long nanos) {<!-- -->
    SNodes = null;
    int mode = (e == null) ? REQUEST : DATA; // If e is NULL, it means a consumer thread, otherwise it is a production thread
    for (;;) {<!-- -->
        SNode h = head;
        // The head node is empty or the new node is the same pattern as the head node, insert it into the stack
        if (h == null || h.mode == mode) {<!-- -->
            if (timed & amp; & amp; nanos <= 0) {<!-- --> // if timed out, pop it off the stack
                if (h != null & amp; & amp; h.isCancelled())
                    casHead(h, h. next);
                else
                    return null;
            }
            // When the head node is null, create the head node and assign it to s, then CAS updates the head node
            else if (casHead(h, s = snode(s, e, h, mode))) {<!-- -->
                SNode m = awaitFulfill(s, timed, nanos);// Wait for the matching thread to match and take the data, and the node to be matched is blocked in this step
                if (m == s) {<!-- --> // equality means that the node has been cancelled, just remove it from the stack
                    clean(s);
                    return null;
                }
                if ((h = head) != null & amp; & amp; h. next == s)
                    casHead(h, s.next); // help s's fulfiller
                return (E) ((mode == REQUEST) ? m. item : s. item);
            }
        }
        // Execution to the if judgment shows that the current node and the head node are in different modes, and it is judged whether the head node is the node that is performing the matching operation,
        // Judging by the FULFILLING flag
        // If the head node is not a matching node, try to match it
        else if (!isFulfilling(h.mode)) {<!-- -->
            if (h.isCancelled()) // the node has been canceled pop it off the stack
                casHead(h, h.next); // Update the stack top pointer
            // Push the current node onto the stack in FULFILLING mode, indicating that the current node needs to be matched with the original head node
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {<!-- -->
                for (;;) {<!-- --> // loop until matched or waiters disappear
                    SNode m = s.next; // m is s's match
                    // Indicates that the original head node has been matched by other matching threads, you need to re-acquire the latest head node to match again
                    if (m == null) {<!-- -->
                        casHead(s, null); // pop s out
                        s = null; // use new node next time
                        break; // restart main loop
                    }
                    SNode mn = m.next; // The m node at this time is the original head node
                    if (m. tryMatch(s)) {<!-- -->
                        casHead(s, mn); // The match is successful Update the head node, skip the two nodes being matched
                        return (E) ((mode == REQUEST) ? m. item : s. item);
                    } else // lost match
                        s.casNext(m, mn); // Matching failed, update the next node of s
                }
            }
        }
        // The head node is FULFILLING, helping it to complete the matching operation
        else {<!-- -->
            SNode m = h.next; // m is the matched node
            // m is null means that there is no matching node in the stack, pop the FULFILLING node from the stack
            // The next time you enter the loop, if the head node is empty, enter the queue and block waiting to be matched
            if (m == null)
                casHead(h, null);
            else {<!-- -->
                SNode mn = m.next; // Take out the next node of m to update the head
                if (m.tryMatch(h)) // try to help them match
                    casHead(h, mn); // The match is successful, update the head node to mn, skip the first two matching nodes
                else // Matching failed, m has been matched by other threads, update the next of the head node to point to mn, and then re-execute the loop
                    h.casNext(m, mn); // disconnect the next link of m
            }
        }
    }
}

Picture 1

awaitFulfill method

Block and wait for the matching thread to match with it. Before entering the OS blocking waiting, try to perform a spin operation. If the node times out or is interrupted, directly cancel the node and exit the loop waiting. If it has not been matched during the spinning period, enter the OS blocking

SNode awaitFulfill(SNode s, boolean timed, long nanos) {<!-- -->
    // wait for timeout, calculate the absolute time
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();// current thread
    int spins = (shouldSpin(s) ?//Is spin waiting required? 1. If s is the head node; 2. When the head node is empty; 3. The head node is being matched;
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {<!-- -->
        if (w. isInterrupted())
            s.tryCancel(); // If the node is interrupted, directly cancel the node, point the match to itself, and mark itself as canceled
        SNode m = s.match;
        if (m != null) // m may be itself (the node is canceled) or a matched node (the match is successful)
            return m;
        if (timed) {<!-- -->
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {<!-- -->
                s.tryCancel(); // timeout cancel node
                continue;
            }
        }
        if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null) s.waiter = w;
        else if (!timed) LockSupport. park(this);
        else if (nanos > spinForTimeoutThreshold) LockSupport. parkNanos(this, nanos);
    }
}

tryMatch method

After the match is successful, point the match of the matched node to the s node, and wake up the matched node, as shown in the figure below:

boolean tryMatch(SNode s) {<!-- -->
    // match is not NULL means it has been matched
    // Point the match of the matched node (m) to the matching node (s)
    if (match == null & amp; & amp;
        UNSAFE. compareAndSwapObject(this, matchOffset, null, s)) {<!-- -->
        Thread w = waiter;
        if (w != null) {<!-- -->
            waiter = null;
            LockSupport. unpark(w);
        }
        return true;
    }
    return match == s;
}

clean method

Remove the canceled node from the stack

void clean(SNode s) {<!-- -->
    s.item = null;
    s.waiter = null;
    SNode past = s. next;
    // Determine whether the next node is also canceled
    if (past != null & amp; & amp; past.isCancelled())
        past = past. next;
    SNode p;
    // Traverse to find the uncancelled node, update the head to the uncancelled node
    while ((p = head) != null & amp; & amp; p != past & amp; & amp; p.isCancelled())
        casHead(p, p. next);
    while (p != null & amp; & amp; p != past) {<!-- -->
        SNode n = p.next;
        if (n != null & & n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}