ConcurrentHashMap source code topic

1. Preface

HashMap is efficient and fast, but it is not safe. Especially in 2020, security is very important. There are currently three ways to provide secure maps on the market, namely

  1. hashtable: a relatively old thread safety mechanism. Only one thread can write operations at any time. It is now basically deprecated and replaced by the more efficient ConcurrentHashMap.
  2. synchronizedMap: An internal class in Collections that can convert an ordinary map into a thread-safe map. The principle is to add synchronized to the operation object.
  3. ConcurrentHashMap: Thread-safe HashMap.

How to achieve thread safety

There are currently two solutions to the problems caused by multi-thread concurrency.

  1. Pessimistic mechanism: Believe that the worst outcome will definitely happen, and set certain rules from the beginning to minimize the probability of occurrence, which is a bit of a precaution and precaution. Pessimistic implementations of concurrency generally use locks, and the synchronized keyword in Java is also widely used in multi-thread concurrency scenarios. But locks can reduce program performance.
  2. Optimism Mechanism: Believe that the result will always be good, do it first and then talk about it. If it doesn’t work, then think of ways: retry, remediation, version number control, etc. CAS is an optimistic mechanism.

CAS + spin is mainly used in ConcurrentHashMap. If the change is successful, change it. If the change is unsuccessful, continue to try (spin). It is also used with synchronized.

CAS & amp; Unsafe

The full name of CAS is Compare And Swap, which is comparison exchange, and it is also the basis of JUC. We only briefly introduce its principles, and students will need to study the details separately.

The cas process is easy to understand, but will it be unsafe in the case of multiple CPUs and multiple threads? Rest assured it is safe. Java’s cas is actually a cas atomic instruction that calls the CPU through the Unsafe class method. The operating system itself is thread-safe for memory operations, and the space is too short to explain clearly. You can study JMM by yourself here. JMM is not the focus of this article. The only thing you need to know here is that CAS can guarantee atomicity. However, it only provides atomicity for a single variable, and atomicity for multiple variables requires synchronized.

Unsafe is an alternative in Java. One of the characteristics of Java is that it is safe and does not allow programmers to directly operate the memory. However, the Unsafe class can conditionally execute the CAS method. However, it is not recommended that you use Unsafe.class because it is unsafe and Sun plans to cancel Unsafe.

Second, source code analysis

sizeCtl & amp; constructor

Neither ConcurrentHashMap nor HashMap does array initialization in their respective constructors, and the initialization is placed in the first element added. It is worth noting that there is an attribute sizeCtl in ConcurrentHashMap that is particularly important. If you understand its changes, you will understand the entire Map source code process. Here is its description

/**
     * Table initialization and resizing control. When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads). Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     * <p>
     * Control identifier, used to control table initialization and expansion operations. Different values have different meanings.
     * <p>
     * 1. When it is a negative number: -1 represents initialization, -N represents N-1 threads undergoing expansion
     * <p>
     * 2. When it is 0: it means that the table at that time has not been initialized yet.
     * <p>
     * 3. When it is a positive number: uninitialized indicates the initial capacity of the initialized array. If it has been initialized,
     * What is recorded is the threshold for capacity expansion (capacity will be expanded when the threshold is reached)
     */
    private transient volatile int sizeCtl;

Take another look at the code of ConcurrentHashMap with initialization capacity

 /**
     * Creates a new, empty map with an initial table size
     * accommodating the specified number of elements without the need
     * to dynamically resize.
     *
     * @param initialCapacity The implementation performs internal
     * sizing to accommodate this many elements.
     * @throws IllegalArgumentException if the initial capacity of
     * elements are negative
     *
     * At this time, sizeCtl records the initial capacity of the array.
     *
     * For example initialCapacity=5
     * Call tableSizeFor(5 + 5/2 + 1)==tableSizeFor(8)
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    
    /**
     * Returns a power of two table size for the given desired capacity.
     * See Hackers Delight, sec 3.2
     * Returns a number greater than or equal to the power of 2 of c
     *
     * When c=8
     * n = c-1=7
     * Next, check the final result
     * 0000 0000 0000 0000 0000 0000 0000 0111
     * >>> 1
     * = 0000 0000 0000 0000 0000 0000 0000 0011
     * | 0000 0000 0000 0000 0000 0000 0000 0111
     * = 0000 0000 0000 0000 0000 0000 0000 0111
     * >>> 2
     * = 0000 0000 0000 0000 0000 0000 0000 0001
     * | 0000 0000 0000 0000 0000 0000 0000 0111
     * = 0000 0000 0000 0000 0000 0000 0000 0111
     * >>> 4
     * = 0000 0000 0000 0000 0000 0000 0000 0000
     * | 0000 0000 0000 0000 0000 0000 0000 0111
     * = 0000 0000 0000 0000 0000 0000 0000 0111
     * The binary numbers after >>> 8 and >>> 16 are all 0.
     * So the final result is 111, which is 7. The final return result is + 1, which is equal to 8
     *
     * Summary: The right shift has a total of 1 + 2 + 4 + 8 + 16 = 31 bits, and the corresponding | operation
     * Finally move all 1's in n's binary to low bits. The high bits of the new number are all 0, and the low bits are all 1. Numbers in this format are mentioned in HashMap, which is the power of 2 -1.
     * The final result is this number + 1, which is the power of 2.
     */
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

When we new ConcurrentHashMap(c), the initial capacity is not c, but a power of 2 greater than or equal to c. Let’s use launch to verify

public static void main(String[] args) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(5);
        Class clazz = concurrentHashMap.getClass();
        try {
            Field field = clazz.getDeclaredField("sizeCtl");
            //Open private access
            field.setAccessible(true);
            //Get attributes
            String name = field.getName();
            //Get attribute value
            Object value = field.get(concurrentHashMap);
            System.out.println("The initial capacity of ConcurrentHashMap is: = " + value);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
--The print result is: Map's initial capacity = 8

put & amp; putVal

putVal() The overall method is spin + CAS, and the process is the same as HashMap.

  • Spin:
    1. If table==null, call initTable() to initialize
    2. If there is no hash collision, add CAS
    3. If it is being expanded, assist in the expansion
    4. If there is a hash collision, if it is a one-way list, insert it into the end of the bucket. If it is a red-black tree, insert the number structure
    5. If the bucket length of the linked list is greater than 8, switch to red-black tree
    6. If the addition is successful, call the addCount() method to count the size and check whether expansion is needed

From the source code, we can see that when putting new elements, if a hash conflict occurs, the conflicting bucket will be locked first without affecting other bucket operations, achieving the purpose of safe and efficient concurrency.

 /**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * <p>The value can be retrieved by calling the {@code get} method
     * with a key that is equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with {@code key}, or
     * {@code null} if there was no mapping for {@code key}
     * @throws NullPointerException if the specified key or value is null
     */
    @Override
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /**
     * Implementation for put and putIfAbsent
     *
     */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //If there is a null value or empty key, throw an exception directly
        if (key == null || value == null) {
            throw new NullPointerException();
        }
        //Hashing twice reduces hash conflicts and can be evenly distributed
        int hash = spread(key.hashCode());
        int binCount = 0;
        //iterate the current table
        for (Node<K, V>[] tab = table; ; ) {
            Node<K, V> f;
            int n, i, fh;
            //1. If the table is not initialized, initialize it first
            if (tab == null || (n = tab.length) == 0) {
                tab = initTable();
            }
            //If there is no data at position i, insert cas
            else if ((f = tabAt(tab, i = (n - 1) & amp; hash)) == null) {
                //cas and the outer else if condition form a double insurance to ensure data security
                if (casTabAt(tab, i, null,
                        new Node<K, V>(hash, key, value, null))) {
                    break; // no lock when adding to empty bin
                }
            }
            //2. If the hash value is MOVED, it means that the array is being expanded, and it will assist in the expansion. It will expand first and then add new elements.
            else if ((fh = f.hash) == MOVED) {
                tab = helpTransfer(tab, f);
            } else {
                //The bucket calculated by hash is not empty, and is not currently in expansion operation, add elements.
                V oldVal = null;
                //Lock the current bucket to ensure thread safety and perform element addition operations.
                synchronized (f) {
                    //Determine whether it is f to prevent it from becoming a tree
                    if (tabAt(tab, i) == f) {
                        //Hash value >=0 indicates that the node is a linked list structure
                        if (fh >= 0) {
                            binCount = 1;
                            //e records the head node
                            for (Node<K, V> e = f; ; + + binCount) {
                                K ek;
                                //putting the same key will overwrite the original value
                                if (e.hash == hash & amp; & amp;
                                        ((ek = e.key) == key ||
                                                (ek != null & amp; & amp; key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K, V> pred = e;
                                if ((e = e.next) == null) {
                                    //Insert at the end of the linked list
                                    pred.next = new Node<K, V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {
                            Node<K, V> p;
                            binCount = 2;
                            //Red-black tree structure rotation insertion
                            if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent) {
                                    p.val = value;
                                }
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //Convert red-black tree when the length of the linked list is greater than 8
                    if (binCount >= TREEIFY_THRESHOLD) {
                        treeifyBin(tab, i);
                    }
                    if (oldVal != null) {
                        return oldVal;
                    }
                    break;
                }
            }
        }
        //Count size and check whether expansion is needed
        addCount(1L, binCount);
        return null;
    }

initTable, initialize table

 /**
     * Initializes table, using the size recorded in sizeCtl.
     * Initialize the table and re-record the sizeCtl value. This value is the threshold for the next expansion of the array.
     */
    private final Node<K, V>[] initTable() {
        Node<K, V>[] tab;
        int sc;
        //Check the empty table again to enter the initialization operation
        while ((tab = table) == null || tab.length == 0) {
            // sizeCtl<0, that is, elseif below sets sizeCtl to -1. It means that other threads are already initializing or expanding, suspend the current thread, and spin to wait.
            if ((sc = sizeCtl) < 0) {
                Thread.yield();
             //CAS sets SIZECTL to -1. If the setting is successful, continue to perform the following operations. If it fails, it means that other threads are performing operations at this time and continue to spin.
            } else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    //double check to ensure thread safety, some threads may have completed synchronization
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                        table = tab = nt;
                        //Record the size of the next expansion, which is equivalent to n-n/4=0.75n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //At this time, the value of sizeCtl is the threshold for the next expansion.
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

helpTransfer assists in capacity expansion

 /**
     * Helps transfer if a resize is in progress.
     * <p>
     * If the array is being expanded, assist it and expand it together with multiple worker threads
     * Copy elements from the old table to the new table
     *
     */
    final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
        Node<K, V>[] nextTab;
        int sc;
        //If f is a ForwardingNode, it means that f is expanding and the hash value has been marked as MOVED.
        //ForwardingNode.nextTable means that the new table is not empty
        if (tab != null & amp; & amp; (f instanceof ForwardingNode) & amp; & amp;
                (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
            //According to length, get the first 16 bits of the identifier and the array capacity.
            int rs = resizeStamp(tab.length);
            //Multiple condition judgment has not been completed and is still in progress. The old and new arrays have not changed, and sizeCtl<0
            while (nextTab == nextTable & amp; & amp; table == tab & amp; & amp;
                    (sc = sizeCtl) < 0) {
            // 1. sizeCtl unsigned right shift by 16 bits to obtain the high 16 bits. If not equal, the rs identifier has changed.
            // 2. (sc == rs + 1), indicating the end of expansion
            // 3. (sc == rs + MAX_RESIZERS) The maximum number of helper threads reached 65535
            // 4. transferIndex<= 0 also indicates that the expansion has ended
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                //Add a thread to help expand capacity
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

TreeNode structure

 /**
     * Nodes for use in TreeBins
     */
    static final class TreeNode<K, V> extends Node<K, V> {
        TreeNode<K, V> parent; // red-black tree links
        TreeNode<K, V> left;
        TreeNode<K, V> right;
        TreeNode<K, V> prev; // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K, V> next,
                 TreeNode<K, V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
    }

TreeNode inherits Node, and adds prev, etc. Few people pay attention here. In fact, it maintains a two-way list while maintaining the red-black tree. Although red-black tree query is convenient, migration is really difficult. Migration will be much easier with the help of two-way list.

transfer one-way list expansion

 /**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     * Multi-thread expansion operation
     */
    private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
        int n = tab.length, stride;
        //Array migration is executed in blocks. If the number of buckets processed by each core is less than 16, then 16 is forced to be assigned.
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        }
        //If it is an expansion thread, the new array is null at this time
        if (nextTab == null) { // initiating
            try {
                //Construct a new array whose capacity is twice the original capacity
                @SuppressWarnings("unchecked")
                Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) { // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //Record the bucket where the thread starts to migrate, migrating from back to front
            transferIndex = n;
        }
        int nextn = nextTab.length;
    //The migrated bucket will be occupied by fwd (the hash value of this node is MOVED). This has been seen in the put method.
        ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
        // When advance == true, it indicates that the node has been processed
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0; ; ) {
            Node<K, V> f;
            int fh;
            //Calculate which part each thread is responsible for, and assign fwd nodes after migration
            //i records the index value of the bucket currently being migrated
            //bound records the starting bucket position of the next task migration
            //--i>=bound indicates that the migration task assigned by the current thread has not been completed yet
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing) {
                    advance = false;
                //No elements need to be migrated
                } else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                } else if (U.compareAndSwapInt // Use CAS to calculate the starting bucket position of the next task migration, and give the value to transferIndex
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //No more buckets that need to be migrated
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // After the expansion is completed, table points to the new array, recalculates the expansion threshold, and assigns it to sizeCtl
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
              // Decrease the number of expansion task threads by 1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //Determine whether all current expansion tasks have been completed. Equality indicates completion.
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
                        return;
                    }
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            } else if ((f = tabAt(tab, i)) == null) { //The current node is null, add a ForwardingNode at this position
                advance = casTabAt(tab, i, null, fwd);
            } else if ((fh = f.hash) == MOVED) {//If it is ForwardingNode, it means that it has been expanded
                advance = true; // already processed
            } else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K, V> ln, hn;
                        // fh >= 0, expressed as a linked list node
                        if (fh >= 0) {
                            //Construct two linked lists, one is the original linked list, the other is the reverse order of the original linked list
                            int runBit = fh & n;
                            Node<K, V> lastRun = f;
                            for (Node<K, V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            } else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K, V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash;
                                K pk = p.key;
                                V pv = p.val;
                                if ((ph & amp; n) == 0) {
                                    ln = new Node<K, V>(ph, pk, pv, ln);
                                } else {
                                    hn = new Node<K, V>(ph, pk, pv, hn);
                                }
                            }
                            // Expand the capacity first and then insert the corresponding value
                            //Add an element to the i position of the new table
                            setTabAt(nextTab, i, ln);
                            //Add an element to the i+1 position of the new table
                            setTabAt(nextTab, i + n, hn);
                            //Insert ForwardingNode at the old table i position to indicate that the node has been processed
                            setTabAt(tab, i, fwd);
                            advance = true;
                            // Red-black tree processing logic is essentially maintaining a two-way linked list
                        } else if (f instanceof TreeBin) {
                            TreeBin<K, V> t = (TreeBin<K, V>) f;
                            TreeNode<K, V> lo = null, loTail = null;
                            TreeNode<K, V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K, V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K, V> p = new TreeNode<K, V>
                                        (h, e.key, e.val, null, null);
                                if ((h & amp; n) == 0) {
                                    if ((p.prev = loTail) == null) {
                                        lo = p;
                                    } else {
                                        loTail.next = p;
                                    }
                                    loTail = p;
                                     + + lc;
                                } else {
                                    if ((p.prev = hiTail) == null) {
                                        hi = p;
                                    } else {
                                        hiTail.next = p;
                                    }
                                    hiTail = p;
                                     + + hc;
                                }
                            }
                            // After expansion, if the number of red-black tree nodes is <= 6, convert the tree to a one-way linked list
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K, V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K, V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

Three, summary

The exquisite design of ConcurrentHashMap is astonishing and worthy of being a masterpiece. There are 3 points that you may not have noticed before

  1. When new ConcurrentHashMap(c), the initial capacity is not the value passed in. Rather, it is a value greater than or equal to the power of 2 of this value.
  2. Everyone knows that when the linked list is greater than 6, it will be converted into a red-black tree, but few people mention that when the number of red-black tree nodes is less than or equal to 6, it will be converted into a linked list
  3. Strictly speaking, the data structure of ConcurrentHashMap and HashMap should be array + one-way list + (red-black tree + doubly linked list)