Concurrency-safe collection ConcurrentHashMap

Objectives of this class

  • ConcurrentHashMap storage structure
  • Some important design ideas of ConcurrentHashMap
    • Concurrent expansion
    • High and low bit migration
    • Segment lock(*)
    • red black tree
    • linked list

Why use ConcurrentHashMap

HashMap -> not thread safe
HashTable synchronized (biased lock, lightweight lock (CAS))
Balance performance -> security

Usage of ConcurrentHashMap

java-8 lambda (a simplification, syntax sugar)

  • computeIfAbsent
  • computeIfPresent
  • compute (a combination of computeIfAbsent and computeIfPresent)
  • merge (merge data)

ConcurrentHashMap and its storage structure and implementation.

JDK1.8
JDK1.7 segment segment lock. The lock granularity is larger.
The introduction of red-black trees, time complexity O(n) -> O(logn)
.

CHM source code

transient volatile Node<K,V>[] table;
 final V putVal(K key, V value, boolean onlyIfAbsent) {<!-- -->
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
//spin(;;) {cas}
        for (Node<K,V>[] tab = table;;) {<!-- -->
            Node<K,V> f; int n, i, fh;
//If the tab is empty, it means it has not been initialized yet.
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); //After initialization is completed, enter the next loop
//(n - 1) & amp; hash) -> 0-15 -> Calculate the array subscript position.
            else if ((f = tabAt(tab, i = (n - 1) & amp; hash)) == null) {<!-- -->
//If the current node location is empty, store it directly in that location.
//Ensure atomicity through cas.
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    break; // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {<!-- -->
                V oldVal = null;
                synchronized (f) {<!-- --> //Lock the current node to avoid thread safety issues
                    if (tabAt(tab, i) == f) {<!-- --> //Re-judge()
//Process for linked list
                        if (fh >= 0) {<!-- -->
                            binCount = 1; //Count the length of the linked list
                            for (Node<K,V> e = f;; + + binCount) {<!-- -->
                                K ek;
//Whether the same key exists, if so, overwrite it
                                if (e.hash == hash & amp; & amp;
                                ((ek = e.key) == key ||
        (ek != null & amp; & amp; key.equals(ek)))) {<!-- -->
        oldVal = e.val;
        if (!onlyIfAbsent)
        e.val = value;
        break;
        }
//If it does not exist, add the current key/value to the linked list.
        Node<K,V> pred = e;
//The description reaches the last node and is added directly to the end.
        if ((e = e.next) == null) {<!-- -->
        pred.next = new Node<K,V>(hash, key,
        value, null);
        break;
        }
        }
        }
//Processing of red-black trees
        else if (f instanceof TreeBin) {<!-- -->
        Node<K,V> p;
        binCount = 2;
        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
        value)) != null) {<!-- -->
        oldVal = p.val;
        if (!onlyIfAbsent)
        p.val = value;
        }
        }
        }
        }
//
        if (binCount != 0) {<!-- -->
//If the length of the linked list is greater than or equal to 8, the treeifyBin method will be called
        if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
        if (oldVal != null)
        return oldVal;
        break;
        }
        }
        }
        addCount(1L, binCount);
        return null;
        }

initTable();

 private final Node<K,V>[] initTable() {<!-- -->
        Node<K,V>[] tab; int sc;
//As long as the tab is not initialized, it will continue to loop until the initialization is completed.
        while ((tab = table) == null || tab.length == 0) {<!-- -->
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
//Spin through cas (occupy a lock mark through CAS)
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {<!-- -->
//Indicates that the current thread has grabbed the lock
   try {<!-- -->
        if ((tab = table) == null || tab.length == 0) {<!-- -->
            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
            table = tab = nt;
            sc = n - (n >>> 2); //Threshold for expansion.
        }
    } finally {<!-- -->
        sizeCtl = sc;
    }
break;
}
}
        return tab;
        }

treeifyBin

private final void treeifyBin(Node<K,V>[] tab, int index) {<!-- -->
        Node<K,V> b; int n, sc;
        if (tab != null) {<!-- -->
//If the table length is less than 64.
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1); //Expansion
            else if ((b = tabAt(tab, index)) != null & amp; & amp; b.hash >= 0) {<!-- -->
                synchronized (b) {<!-- -->
//Red-black tree conversion.
                    if (tabAt(tab, index) == b) {<!-- -->
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {<!-- -->
                            TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                            null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

tryPresize

This method is used to achieve expansion,

  • Multi-threaded concurrent expansion (allowing multiple threads to assist in expansion)
  • The nature of expansion
    • Multi-threaded concurrent expansion (allowing multiple threads to assist in expansion)
    • The nature of expansion

There must be a place to record how many threads are involved in data migration within the current expansion range. It must be ensured that all
Only when the thread completes the migration action can the expansion be completed.

resizeStamp

static final int resizeStamp(int n) {<!-- -->
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

0000 0000 0000 0000 0000 0000 0001 0000 -> 27 bits
0000 0000 0000 0000 1000 0000 0001 1011
Number returned by resizeStamp: 1000 0000 0001 1011 0000 0000 0000 0000
rs << RESIZE_STAMP_SHIFT) + 2, binary left shift 16 bits + 2
1000 0000 0001 1011 0000 0000 0000 0010 -> Indicates that there is currently a thread to expand the capacity.
The high bit 16 represents the current expansion mark, ensuring uniqueness.
The lower 16 bits indicate the current number of expanded threads.

transfer

How to implement multiple threads to migrate data to the same container.
Implement data transfer.

  • Data migration
    • It is necessary to calculate the data migration space of the current thread. (Task splitting)
    • Create a new array (capacity is the expanded size)
    • Implement data transfer.
      • If it is a red-black tree
        • After data migration, if the conditions of the red-black tree are not met, the red-black tree will be transferred to the linked list.
      • If it is a linked list
      • High and low bit migration
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {<!-- -->
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {<!-- --> // initiating
            try {<!-- -->
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {<!-- --> // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {<!-- -->
            Node<K,V> f; int fh;
            while (advance) {<!-- -->
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {<!-- -->
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {<!-- -->
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {<!-- -->
                int sc;
                if (finishing) {<!-- -->
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {<!-- -->
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {<!-- -->
                synchronized (f) {<!-- -->
                    if (tabAt(tab, i) == f) {<!-- -->
                        Node<K,V> ln, hn;
                        if (fh >= 0) {<!-- -->
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {<!-- -->
                                int b = p.hash & n;
                                if (b != runBit) {<!-- -->
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {<!-- -->
                                ln = lastRun;
                                hn = null;
                            }
                            else {<!-- -->
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {<!-- -->
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & amp; n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {<!-- -->
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {<!-- -->
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & amp; n) == 0) {<!-- -->
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                     + + lc;
                                }
                                else {<!-- -->
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                     + + hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

How does size() count?

red black tree

Section 2, Expansion

  • Create a new array
  • Migrate old array data
  • Multi-thread assisted expansion (for old data, multiple threads are used to perform the data migration process in parallel)
    • Record the current number of threads (sizeCtl)
    • When each thread completes data migration and exits, the number of threads assisting in expansion must be reduced.
  • sizeStamp -> expansion stamp
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {<!-- -->
        int n = tab.length, stride;
        //Calculate the interval size of the data processed by each thread, the minimum is 16.
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
            //Indicates that the expanded array will be expanded twice as much as the original one.
        if (nextTab == null) {<!-- --> // initiating
            try {<!-- -->
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {<!-- --> // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        //transferIndex = length of old table[].
        int nextn = nextTab.length;
        //Used to indicate the status of migration, that is to say, if a node of an old array has completed migration, it needs to be changed to fwd
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {<!-- -->
            Node<K,V> f; int fh;
            while (advance) {<!-- -->
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {<!-- -->
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {<!-- -->
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
                //Assume the array length is 32,
//First time [16(nextBound),31(i)]
//Second time [0,15]
            }
            //Whether the expansion is completed
            if (i < 0 || i >= n || i + n >= nextn) {<!-- -->
                int sc;
                if (finishing) {<!-- -->
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {<!-- -->
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //Indicates that the current array position is empty
            else if ((f = tabAt(tab, i)) == null)
            //Change directly to fwd -> to indicate that the migration is completed.
                advance = casTabAt(tab, i, null, fwd);
                //Determine whether it has been processed, if so, go to the next time
Interval traversal.
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {<!-- -->
             // Ensure that during the migration process, other threads must wait when calling the put() method.
//Guess the following content:
//* Do different processing for different types of nodes
//* linked list
//* Red-black tree
                synchronized (f) {<!-- -->
                    if (tabAt(tab, i) == f) {<!-- -->
                        Node<K,V> ln, hn;
                        if (fh >= 0) {<!-- -->
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {<!-- -->
                                int b = p.hash & n;
                                if (b != runBit) {<!-- -->
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {<!-- -->
                                ln = lastRun;
                                hn = null;
                            }
                            else {<!-- -->
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {<!-- -->
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & amp; n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {<!-- -->
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {<!-- -->
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & amp; n) == 0) {<!-- -->
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                     + + lc;
                                }
                                else {<!-- -->
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                     + + hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }