7. The underlying principle of LongAdder in the Atomic series (segmented locks improve concurrency performance)

The underlying principle of LongAdder of the Atomic series (segment lock improves concurrency performance)

In this chapter, we will explain another problem brought by CAS. When the concurrency is intense, a large number of spins and CPU consumption will be generated, and how to use the segmented lock mechanism To solve this problem, we will use the atomic class LongAdder as an example to explain.

CAS empty CPU problem

The defect of AtomicInteger: when the concurrent competition is fierce, a large number of thread spins are caused

Why was LongAddr designed?

When concurrency is very high, the underlying CAS operations using AtomicInteger and AtomicLong are very competitive, which will cause a large number of threads to fail in CAS and continue to spin, consuming CPU performance.

The source code of the getAndAddInt method of the unsafe class is as follows:

public final int getAndAddInt(Object o, long valueOffset, int x) {<!-- -->
    int expected;
    // The failed thread of CAS will continue to retry in the do...while loop until it succeeds
    do {<!-- -->
        expected = this.getIntVolatile(o, valueOffset);
        // The while condition judges that only one thread of compareAndSwapInt succeeds at a time
    } while(!this.compareAndSwapInt(o, valueOffset, expected, expected + x));
    
    return expected;
}

(1) Let’s take a look at the above do…while loop, assuming that there are 10000 threads operating concurrently, since CAS operations can guarantee atomicity (As mentioned in the previous article, the underlying principle of CAS), Only one thread will execute the compareAndSwapInt method successfully at a time

(2) Another 9999 threads that failed enter the next cycle, and then one succeeds, and the remaining 9998 threads enter the next cycle Layer cycle…, in this way, competition is too fierce causing **a large number of threads**** to spin and retry in the loop, and CPU resources will not be released at this time , greatly reducing CPU utilization and reducing concurrent performance.

Segmented lock idea reduces concurrent competition

LongAdder adopts the idea of segmented lock** to reduce concurrent competition; for example, the above 10000 threads concurrently operate, but LongAdder may have 10 locks, different threads may compete for different locks, on average may be 1000 threads competing for 1 lock like this; the concurrency performance is compared in this way Starting from AtomicInteger may be improved by 10 times.

Next, I will use the multi-window mechanism in the bank service hall to explain to you what is a segmented lock

As shown below:

(1) There is a regular window and a bunch of spare windows in the bank lobby; each window has a counter value that records the current window Visitors of

(2) When the number of people is small, there is no competition, usually it is enough to open the regular window in the office hall

(3) When a bank manager want to know the total number of visitors, he needs to accumulate the value of all windows the number of visitors

When the number of visitors is small, such as one or two, it is enough to only open the regular window.

At the same time, there is a visitor counter in each window, and the window every time a visitor is received, the value will be increased by 1, when the bank manager wants to check When the total number of visitors in the entire office hall is calculated, just add up the visitor counter values of all windows.

Multi-window processing reduces contention

(1) In the case of too many visitors, just open the regular window is not acceptable**, because one window at the same time Only one visitor’s request can be processed, and the concurrency competition is very fierce (**This is the defect of AtomicInteger mentioned above, everyone competes through a window)

(2) So at this time enable the backup window. Suppose there are 4 backup windows, namely windows A, B, C, and D**, and each user has its own id strong>For example, after user 2 (id=4) comes, sees that there are people in the regular window, it will use id % standby window size=》4 % 4 = 0 , it will be dispatched to the first spare window, which is **window A

(3) At the same time When the concurrency is very intense, There is also competition for the same spare window, such as **User 3 (id=5), User 6 (id=9 )According to id % backup window size = 2, so they all compete for backup window B; **user 4 (id=6) and user 7 (id=10)** will also compete for backup window C

(4) Using the standby window list is equivalent to dispatching different users to different windows, reducing competition, so The concurrency performance will naturally improve.

The above mechanism is actually similar to breaking the lock into multiple lock segments; where each window is a lock, only is assigned to the same Only the users of the window will have competition, and the use of segment locks greatly reduces competition and improves concurrency performance.

At this time, if the bank manager wants to know the total number of visitors to the service hall, he only needs to add up the number of value people in multiple windows (multiple segments).

LongAdder segment cable implementation principle

First of all, before I tell you about LongAdder, I have to introduce the Striped64 class to you. The Striped64 class is the basis of segmented locks and implements the function of segmented locks. , while LongAdder inherits Striped64, and only makes a simple encapsulation based on Striped64.

Cell unit (window)

Cell {<!-- -->
    // Counter, how many people are currently visiting
    volatile long value;
    // Constructor
    Cell(long x) {<!-- --> value = x; }
    // CAS competes for the same window, and multiple users CAS may compete for this window
    final boolean cas(long cmp, long val) {<!-- -->
        return UNSAFE. compareAndSwapLong(this, valueOffset, cmp, val);
    }
}

Basic window

Directly use a base variable to represent the base window

transient volatile long base;

The method of scramble for the base window, casBase underlying source code, directly cas modifies the value of the base variable in memory

final boolean casBase(long cmp, long val) {<!-- -->
    return UNSAFE. compareAndSwapLong(this, BASE, cmp, val);
}

User id generation method

Here is to get the user id through a series of operations through the thread id

static final int getProbe() {<!-- -->
    return UNSAFE. getInt(Thread. currentThread(), PROBE);
}

LongAdder internal source code analysis

First look at the addition and subtraction methods provided by LongAdder:

public void increment() {<!-- -->
    add(1L);
}

public void decrement() {<!-- -->
    add(-1L);
}

We see that the increment and decrement methods both call the add method at the bottom layer, but the input is the difference between positive and negative numbers .

Look at the extremely simple and exquisite add method:

public void add(long x) {<!-- -->
    // as is similar to the spare window list we mentioned above
    Cell[] as;
    // Here b is the value of the regular window, and v is the value of the window you are assigned to
    long b, v;
    // m is the length of the spare window - 1,
    // We said above that the getProbe() method is the method to obtain the user id
    // getProbe() & amp; is actually the user id % total number of windows, the algorithm of window assignment
    int m;
    // a is the window you were dispatched to
    Cell a;
    // 1. First, if cells==null, it means that the standby window is not open.
    // All execute casBase to compete for the regular window, if the cas is successful, the competition is successful, and then exit after finishing the work
    // If the scramble fails casBase == false, it will enter the if code to try again
    // 2. If cells != null, it means that the spare window has been developed, and there is no need to compete for the regular window.
    // Directly enter the scramble spare window
    if ((as = cells) != null || !casBase(b = base, b + x)) {<!-- -->
        boolean uncontended = true;
        // 3. as == null || as.length - 1 < 0 means that the list of spare windows has not been opened yet
        if (as == null || (m = as. length - 1) < 0 ||
            // 4. as[getProbe() & amp; m] is the alternate window you were dispatched to
            // (a = as[getProbe() & amp; m]) == null The backup window you were dispatched to has no one yet
            (a = as[getProbe() & m]) == null ||
            // 5. a.cas() is to try to grab the permission of window a after you are assigned to window a
            // If uncontented is the result of your scramble, if !uncnottented == true, it means your scramble failed
            !(uncontended = a.cas(v = a.value, v + x)))
            
            // Equivalent to a bottom-up solution after the above operations fail
            longAccumulate(x, null, uncontended);
    }
}


Strimped64 segmented cable implementation source code

Continue to view the source code of LongAccumulate method:

There are several things done in longAccumulate, let’s analyze it from a big perspective first:

(1) Enter the method, the yellow part, the first is to get the user id, if it is 0, first force the initialization of the user id

(2) Then it enters the for infinite loop, and only when the user’s request is successfully processed will it exit the loop

(3) Then there are the following important three big branch conditions

Enter alternate window processing

// cells is a list of spare windows, if the spare window is not equal to null,
// and length>0 means the standby window is open
// Then the user enters this condition and goes to the standby window list for processing
if ((as = cells) != null & amp; & amp; (n = as. length) > 0)

Initialize the alternate window

If the branch above is not satisfied, it means that cells == null or cells.length <= 0 means that the backup window is not open. , at this time you need to open the spare window, that is, initialize the spare window list array:

// cellsBusy is whether the standby window list is being initialized or expanded
// cellsBusy == 0 means no one has initialized or expanded the capacity yet
// At this time, the thread executed here will initialize the spare window
// For example, when thread 1 comes here and finds that it is not initialized, it needs to execute casCellBusy to tell others that it is being initialized
else if (cellsBusy == 0 & amp; & amp; cells == as & amp; & amp; casCellsBusy())

Compete for regular windows

If cellsBusy == 1 it means that someone is initializing the spare window or expanding the spare window list, at this time the spare window list is unavailable , can only go to the regular window to fight

// Directly compete for the regular window, if successful, exit the infinite loop
else if (casBase(v = base, ((fn == null) ? v + x :
                            fn. applyAsLong(v, x))))
    break;

Draw a picture here

(1) For example, after User 1 enters here, first judge branch 1, and find that backup window is not enabled; then directly **** enter branch 2 strong>, go to Initialize Backup Window

(2) Then User 2 also came in, and found that the backup window was not enabled, and found that someone was initializing the backup window, directly entered Branch 3, to compete for the regular window

(3) After the backup window is initialized and enabled; user requests come in and go to branch 1, and go to the backup window list to compete.

Source code of branch 2: Initialize the standby window

else if (cellsBusy == 0 & amp; & amp; cells == as & amp; & amp; casCellsBusy()) {<!-- -->
    // Whether init is initialized, initially set to false
    boolean init = false;
    try {<!-- --> // Initialize table
        // Then there is nothing complicated here, just create an array
        // But at this time there are no elements in the array, each element is empty
        if (cells == as) {<!-- -->
            Cell[] rs = new Cell[2];
            // Create a new window and give the operand x to the newly created window
            rs[h & 1] = new Cell(x);
            cells = rs;
            // Then set the initialization completion to true
            init = true;
        }
    } finally {<!-- -->
        // Reset the cellsBusy flag, the operation is complete, no one is initializing or expanding
        cellsBusy = 0;
    }
    // If initialization is complete, exit the loop
    if (init)
        break;
}



If the cells standby window is null, initialize it; at the same time, if the window grid to which you are dispatched is empty, create a window grid (equivalent to calling a worker over)

The source code of branch 1: the backup window has been opened, ready to go to the backup window:

if ((as = cells) != null & amp; & amp; (n = as. length) > 0) {<!-- -->
    // If the spare window that you are assigned to is null, it means that there is no one working in the window, then call a staff member to take charge
    if ((a = as[(n - 1) & h]) == null) {<!-- -->
        // If no one is initializing, create a window below (call a worker to handle it)
        if (cellsBusy == 0) {<!-- -->
            // Create a new window and submit your own request x at the same time
            Cell r = new Cell(x);
            // if no one is initializing
            if (cellsBusy == 0 & amp; & amp; casCellsBusy()) {<!-- -->
                boolean created = false;
                try {<!-- -->
                    Cell[] rs; int m, j;
                    // Check again that the standby window list is initialized
                    if ((rs = cells) != null & amp; & amp;
                        (m = rs.length) > 0 & &
                        rs[j = (m - 1) & h] == null) {<!-- -->
                        // Set up the new window created by yourself
                        rs[j] = r;
                        created = true;
                    }
                } finally {<!-- -->
                    cellsBusy = 0;
                }
                if (created)
                    break;
                continue; // Slot is now non-empty
            }
        }
        collide = false;
    }
    // If you already know that the cas operation has failed, reset the failure flag and enter the next cycle to try again
    else if (!wasUncontended) // CAS already known to fail
        wasUncontended = true; // Continue after rehash
    // According to the algorithm of user id % total number of spare windows, which spare window should you fight for
    // If the competition is successful, the operation is over, if it fails, enter the next cycle and try again
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                 fn. applyAsLong(v, x))))
        break;
    else if (n >= NCPU || cells != as)
        collide = false;
    else if (!collide)
        collide = true;
    // Come here, it means that the above operations have failed, and the competition for the spare window is still very fierce
    // So we need to expand the capacity, add some spare windows to ease the fierce competition
    else if (cellsBusy == 0 & amp; & amp; casCellsBusy()) {<!-- -->
        try {<!-- -->
            // Here is the specific expansion operation
            if (cells == as) {<!-- --> // Expand table unless stale
                // The new spare window is twice the original size
                Cell[] rs = new Cell[n << 1];
                // Reassign the existing spare window
                for (int i = 0; i < n; + + i)
                    rs[i] = as[i];
                cells = rs;
            }
        } finally {<!-- -->
            cellsBusy = 0;
        }
        collide = false;
        continue; // Retry with expanded table
    }
    // Recalculate your own user id here,
    // Make the user id spread to different windows as much as possible to reduce competition
    h = advanceProbe(h);
}