JDK1.7 ConcurrentHashMap学习笔记

为何用ConcurrentHashMap

在并发编程里,HashMap是线程不安全的,如果使用HashMap的话,那么可能会造成死循环,原因是HashMap里的Entry链表在多线程操作下,会变成循环链表,那么一旦get的话,那么next节点永远不为空,就会出现死循环了。

而HashTable的操作效率又过低,因为它是使用synchronized对整个HashTable加锁,所有线程都是在竞争同一把锁。当一个线程访问HashTable的同步方法,其他线程如果也访问它的同步方法时会陷入阻塞或轮询状态。如一个线程用put添加元素时,那么另一个线程既不能put,也不能get获取元素。所以在竞争越激烈的情况下,效率越低。使用Collections.synchronizedMap是使用了装饰器模式,在get、put和其他操作外都包装了一层synchronized锁,与HashTable差不多,所以效率也不高。

总的来说,就是在多线程场景下,使用HashMap会造成死循环,而使用HashTable的效率又过于低下。所以就有了ConcurrentHashMap,在JDK1.7中,ConcurrentHashMap使用了锁分段技术,即把数据分散放到多个桶中,每一个桶配一把锁,当多线程访问不同桶的数据时,就不会出现锁竞争的情况,即当一个线程访问一个桶的数据,其他桶的数据也能被其他线程访问。从而大大提高了并发效率。

内部存储结构

ConcurrentHashMap的存储结构如下,它是由一个Segment数组组成,而每个Segment里又有一个HashEntry链表数组,所以说每个Segment也类似于HashMap,也就是说它能进行扩容。当想对HashEntry数组里的数据修改时,必须先获得对应的Segment的锁。
Segment的个数一旦在初始化后,就不能再改变。Segment个数也可以说是并发级别,默认是16个,就是说ConcurrentHashMap默认最多支持16个线程并发。Segment的个数也总是2的n次方,这样设计也是类似于HashMap,为了在hash计算(取模)时做优化。
在这里插入图片描述
看一下HashEntry的结构

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        //其他省略
}

源码分析

初始化

ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel几个参数,来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组的。segmentShift和segmentMask在决定数据要放入哪个segment中会用到。
接下来看构造函数的内部逻辑。

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    // 参数校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 校验并发级别大小,大于 1<<16,重置为 65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;

    // Find power-of-two sizes best matching arguments
    // 用来保存2的n次方的n
    int sshift = 0;
    // 2的n次方
    int ssize = 1;
    // 这个循环可以找到大于等于 concurrencyLevel 的最近的2的n次方值
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 记录段偏移量
    this.segmentShift = 32 - sshift;
    // 记录段掩码
    this.segmentMask = ssize - 1;
    // 设置容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // c = 总容量 / segment个数 ,默认 16 / 16 = 1,这里是计算每个 Segment 中的HashEntry数组的容量
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //Segment 中的HashEntry数组的容量必须是2的n次方,类似于HashMap
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 创建 Segment 数组,设置 segments[0]
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    //将segments[0]放到ss中去
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

总结一下初始化流程,分为以下步骤:

  1. 参数检验及并发级别大小校验
  2. 计算大于等于concurrencyLevel的2的幂次方,作为segment数组容量大小
  3. 记录segmentShift和segmentMask,后面在定位segment索引时会用到
  4. 初始化segment[0],放入segment

put方法

/**
 * Maps the specified key to the specified value in this table.
 * Neither the key nor the value can be null.
 *
 * <p> The value can be retrieved by calling the <tt>get</tt> method
 * with a key that is equal to the original key.
 *
 * @param key key with which the specified value is to be associated
 * @param value value to be associated with the specified key
 * @return the previous value associated with <tt>key</tt>, or
 *         <tt>null</tt> if there was no mapping for <tt>key</tt>
 * @throws NullPointerException if the specified key or value is null
 */
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    //对元素的key进行一次再散列,以减少散列冲突,使元素能够比较均匀的分布在不同的segment中,提高存取效率
    int hash = hash(key);
    
    // hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算
    // 其实也就是把高4位与segmentMask(1111)做与运算
    // 得到要放入的segment索引
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 如果查找到的 Segment 为空,初始化该位置的segment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

/**
 * Returns the segment for the given index, creating it and
 * recording in segment table (via CAS) if not already present.
 *
 * @param k the index
 * @return the segment
 */
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 判断 u 位置的 Segment 是否为null,因为其他线程可能初始化了
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        // 获取0号 segment 里的 HashEntry<K,V> 初始化长度
        int cap = proto.table.length;
        // 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的
        float lf = proto.loadFactor;
        // 计算扩容阀值
        int threshold = (int)(cap * lf);
        // 创建一个 cap 容量的 HashEntry 数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 自旋检查 u 位置的 Segment 是否为null
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                // 使用CAS 赋值,只会成功一次
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

总结一下put流程:

  1. 使用segmentShift和segmentMask计算元素要放入的segment的索引,获取该segment

  2. 若该segment为空,则调用ensureSegment对该位置的segment初始化
    初始化segment流程:
    1.检查计算该位置的 segment 是否为null
    2.为null则使用 segmen[0] 的参数(负载因子、容量)创建一个HashEntry数组
    3.再次检查计算该位置的 segment 是否为null
    4.为null则使用 segmen[0] 的参数和HashEntry数组初始化新的segment
    5.自旋判断该位置的segment是否为null,使用CAS将segment放入数组的对应位置

  3. 调用该segment的 put 插入 key,value 值。

那segment的put方法有哪些步骤呢?

  1. 使用 tryLock() 获取 ReentrantLock 独占锁;如果获取不到,则用 scanAndLockForPut 获取

  2. 计算元素要放入的segment的索引index,然后获取该segment的HashEntry

  3. 判断HashEntry是否为空,若为空:

    1. 判断当前容量是否大于扩容阈值及小于最大容量,符合条件则进行扩容
    2. 使用头插法插入新节点
  4. 若HashEntry不为空:

    1. 遍历HashEntry链表,判断是否有节点的key和hash与要插入的key和hash一致,若一致,则替换该节点的值
    2. 若遍历完发现链表里没有相同的,那么判断是否需要扩容,然后使用头插法插入新节点

这里面的第一步中的 scanAndLockForPut 这个方法做的操作就是不断的自旋 tryLock() 获取锁。当自旋次数大于指定次数时,使用 lock() 阻塞获取锁。在自旋时顺便新建要put的 HashEntry 节点。

扩容rehash

在扩容时会创建一个容量是原来2倍的新数组,然后把原数组里的元素rehash后移动到新数组中去,在元素在新数组中的位置只会是 原位置 或者 原位置+oldCap。HashEntry链表节点会使用头插法插入到指定位置。同时,ConcurrentHashMap只会对某个segment进行扩容,不会对整个容器扩容。还有一点会比HashMap稍好,就是ConcurrentHashMap是在 插入元素前 判断是否需要扩容,HashMap是在 插入元素后 判断是否需要扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。

get方法

  1. 计算key所在的segment的索引
  2. 遍历该segment中的HashEntry链表,寻找key和hash相同的元素
  3. 找到就返回该元素的value值。否则返回null

get方法高效之处在于不需要加锁,因为它把get方法中要使用的共享变量都定义成volatile类型。而volatile类型的变量可在多线程之间保持内存的可见性,能被多线程同时读,并且不会读到过期的值。由于get操作只需要读,所以不用加锁。
不会读到过期的值是因为JMM的happens-before原则,对于volatile变量,写操作总是先于读操作,即使两个线程同时修改和读取,也能保证get能拿到最新的值。经典的volatile替换锁的场景。

Reference

  1. 末读代码
  2. 《Java并发编程的艺术》

版权声明:本文为u013379553原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。