ConcurrentHashMap 1.7

ConcurrentHashMap JDK1.7

The overall structure is Segment (inherited from ReentransLock) + HashEntry array + one-way linked list

image.png

Constructor method initialization process

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {<!-- -->
    // Parameter verification
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // Verify the concurrency level size, if it is greater than 1<<16, reset it to 65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    //How many powers of 2
    int sshift = 0;
    int ssize = 1;
    // This loop can find the nearest power of 2 value above concurrencyLevel
    while (ssize < concurrencyLevel) {<!-- -->
         + + sshift;
        ssize <<= 1;
    }
    //Record segment offset
    this.segmentShift = 32 - sshift;
    //Record segment mask
    this.segmentMask = ssize - 1;
    //Set capacity
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // c = capacity / ssize, default 16 / 16 = 1, here is to calculate the capacity similar to HashMap in each Segment
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
         + + c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //The capacity similar to HashMap in Segment is at least 2 or a multiple of 2
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    //Create the Segment array and set segments[0]
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

  1. Perform necessary parameter verification to determine whether the parameter is less than 0. If the concurrency level is greater than 1<<16, it will be reset to 65536.
  2. Calculate segmentShift, segment offset, used to calculate the Segment subscript corresponding to the Key
  3. Initialize the Segment array. The default size is 16. Each Segment is essentially a ReentransLock, that is, it can support concurrent access by 16 threads at the same time in the default state. When initializing the Segment, the HashEntry in the Segment with index 0 will be initialized at the same time. Default size is 2
  4. Other Segments will not initialize HashEntry and will only load it after use. When other Segments initialize HashEntry, they will use the basic capacity of HashEntry with Segment subscript 0 as a template for initialization operations.

put process

Calculate the Segment subscript corresponding to the Key through the segment offset
public V put(K key, V value) {<!-- -->
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    //Based on key, calculate hash value
    int hash = hash(key);
    //Because one key needs to calculate the index of two arrays, in order to avoid conflicts, the high bit is used to calculate the index of Segment[].
    int j = (hash >>> segmentShift) & amp; segmentMask;
    //Determine whether the Segment object at the index is created, if not, create it
    if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
        s = ensureSegment(j);
    //Call Segmetn's put method to add elements
    return s.put(key, hash, value, false);
}
Use the ensureSegment method to ensure that the subscripted Segment has been created
//Create a Segment object corresponding to the index bit and return
private Segment<K,V> ensureSegment(int k) {<!-- -->
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    //Get, if it is null, create it
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {<!-- -->
        //Using the Segment at 0 corner as the template
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        //Get, if it is null, create it
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) {<!-- --> // recheck
            //create
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            //Spin mode, put the created Segment object into Segment[] to ensure thread safety
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {<!-- -->
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    //return
    return segment;
}
  1. Because UNSAFE.getObjectVolatile is an atomic operation, through the atomic Set method, when initializing Segment and HashEntry, you need to first determine whether they exist.
  2. When the method ends, the Segment and HashEntry of the subscript must be created.
Execute the put process in Segment
  1. The essence of Segment is ReentransLock. If you want to ensure the thread safety of Segment operations, you need to obtain the lock in Segment before operating Segment.
  2. If the lock is not acquired, the HashEntry object will be created first, and so on. There is no need to create a HashEntry object during the insertion process.
  3. First, the CPU will try to spin to acquire the lock without blocking. If the number of spins exceeds 64 times, the lock method will be directly called to block. Therefore, when the user executes to the following point, the Segment must have been obtained at this time. lock, so all operations below are thread-safe and therefore Unsafe’s more efficient methods can be called
  4. The corresponding HashEntry array subscript will be calculated through the low bits, and then traversed
  5. If the same key is found in the linked list, it will be overwritten directly. If there is no key with the same key, it will be inserted through tail insertion.
  6. After insertion, if the number exceeds the threshold (load factor * total number of elements), expansion is required.
final V put(K key, int hash, V value, boolean onlyIfAbsent) {<!-- -->
    //Try to acquire the lock, the acquisition is successful, node is null, and the code is executed downwards
    //If there are other threads occupying the lock object, then do other things instead of waiting to improve efficiency.
    //scanAndLockForPut will be analyzed later
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {<!-- -->
        HashEntry<K,V>[] tab = table;
        //Take the low bit of hash and calculate the index of HashEntry[]
        int index = (tab.length - 1) & amp; hash;
        //Get the element object of the index bit
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {<!-- -->
            //The element object obtained is not empty
            if (e != null) {<!-- -->
                K k;
                //If it is a repeated element, overwrite the original value
                if ((k = e.key) == key ||
                    (e.hash == hash & amp; & amp; key.equals(k))) {<!-- -->
                    oldValue = e.value;
                    if (!onlyIfAbsent) {<!-- -->
                        e.value = value;
                         + + modCount;
                    }
                    break;
                }
                //If it is not a duplicate element, get the next element of the linked list and continue to loop through the linked list
                e = e.next;
            }
            else {<!-- --> //If the element obtained is empty
                //The HashEntry object of the currently added key-value pair has been created
                if (node != null)
                    node.setNext(first); //The header insertion method can be associated
                else
                    //Create a HashEntry object of the currently added key-value pair
                    node = new HashEntry<K,V>(hash, key, value, first);
                //The number of added elements increases
                int c = count + 1;
                //Determine whether expansion is needed
                if (c > threshold & amp; & amp; tab.length < MAXIMUM_CAPACITY)
                    //needs expansion
                    rehash(node);
                else
                    //No expansion required
                    //Save the currently added element object into the array corner position and complete the header interpolation method to add elements.
                    setEntryAt(tab, index, node);
                 + + modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {<!-- -->
        //Release the lock
        unlock();
    }
    return oldValue;
}
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {<!-- -->
    //Get the header element
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {<!-- -->
        //Failed to acquire lock
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {<!-- -->
            //There is no next node, and it is not a repeated element, create a HashEntry object and no longer traverse
            if (e == null) {<!-- -->
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                //Duplicate elements, do not create HashEntry objects, and no longer traverse
                retries = 0;
            else
                //Continue traversing the next node
                e = e.next;
        }
        else if ( + + retries > MAX_SCAN_RETRIES) {<!-- -->
            //If you try to acquire the lock too many times, block directly
            //MAX_SCAN_RETRIES will be determined based on the number of available cpu cores
            lock();
            break;
        }
        else if ((retries & amp; 1) == 0 & amp; & amp;
                 (f = entryForHash(this, hash)) != first) {<!-- -->
            //If another thread acquires the lock during the period, traverse again
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

Capacity expansion process

  1. Create an array twice the size of the HashEntry array.
  2. Traverse all elements in the previous HashEntry, which can only be the old position or the old position + the old capacity
  3. Add to new location using header insertion method
private void rehash(HashEntry<K,V> node) {<!-- -->
    HashEntry<K,V>[] oldTable = table;
    // old capacity
    int oldCapacity = oldTable.length;
    // New capacity, doubled
    int newCapacity = oldCapacity << 1;
    //New expansion threshold
    threshold = (int)(newCapacity * loadFactor);
    //Create new array
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // New mask, the default 2 is 4 after expansion, -1 is 3, and the binary value is 11.
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i + + ) {<!-- -->
        // Traverse the old array
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {<!-- -->
            HashEntry<K,V> next = e.next;
            // Calculate the new location. The new location can only be an inconvenience or the old location + the old capacity.
            int idx = e.hash & amp; sizeMask;
            if (next == null) // Single node on list
                // If the current position is not a linked list, but just an element, assign the value directly
                newTable[idx] = e;
            else {<!-- --> // Reuse consecutive sequence at same slot
                // If it is a linked list
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                // The new location can only be an inconvenience or the old location + old capacity.
                // After the traversal is completed, the positions of the elements after lastRun are all the same.
                for (HashEntry<K,V> last = next; last != null; last = last.next) {<!-- -->
                    int k = last.hash & amp; sizeMask;
                    if (k != lastIdx) {<!-- -->
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                //, the positions of the elements after lastRun are all the same, and they are directly assigned to the new position as a linked list.
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {<!-- -->
                    // Traverse the remaining elements and interpolate the head to the specified k position.
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    //Head insertion method inserts new nodes
    int nodeIndex = node.hash & amp; sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

Size calculation process

  1. Without adding locks, calculate twice. During the calculation process, the number of all nodes in the map, the number of all elements, and the sum of the number of modifications of each Segment will be counted.
  2. If the sum of the two modification times is equal, the result is correct and is returned directly.
  3. Otherwise, spin attempts will continue until the sum of the number of Segment modifications of the two query results is equal, at which time the correct result will be returned.
  4. If the upper limit of the number of spins is reached and an equal number is still not obtained, all Segments will be locked and then summed.
public int size() {<!-- -->
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum; // sum of modCounts
    long last = 0L; // previous sum
    int retries = -1; // first iteration isn't retry
    try {<!-- -->
        for (;;) {<!-- -->
            if (retries + + == RETRIES_BEFORE_LOCK) {<!-- -->
                // If the number of retries is exceeded, all segments need to be created and locked.
                for (int j = 0; j < segments.length; + + j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; + + j) {<!-- -->
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {<!-- -->
                    sum + = seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size + = c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {<!-- -->
        if (retries > RETRIES_BEFORE_LOCK) {<!-- -->
            for (int j = 0; j < segments.length; + + j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

get

  • Before obtaining the element, it will first determine whether the Segment is in the expansion state. If it is not in the expansion state, it will be queried through the get method.
  • The get process does not need to be locked because it uses volatile queries and other threads have visibility after operations.
  • If there is a new table during the expansion process, the query will be done from the new table first. If the new table cannot be queried, the query will be done from the old table.
public V get(Object key) {<!-- -->
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & amp; segmentMask) << SSHIFT) + SBASE;
    // Calculate the storage location of key
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null & amp; & amp;
        (tab = s.table) != null) {<!-- -->
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)((tab.length - 1) & amp; h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {<!-- -->
            // If it is a linked list, traverse to find the value with the same key.
            K k;
            if ((k = e.key) == key || (e.hash == h & amp; & amp; key.equals(k)))
                return e.value;
        }
    }
    return null;
}