【集合类分析】ConcurrentHashMap

部分转自http://www.jianshu.com/p/c0642afe03e0 

ConcurrentHashMap是线程安全的,在多线程环境下可以放心使用(方法有原子性)。

和HashTable的对比:HashTable也是线程安全的,但实现的途径是对每个方法都加了Synchronized悲观锁,进入方法时获取的都是同一把锁(this),会导致在竞争激烈的并发环境下效率低下。ConcurrentHashMap对加锁对象实现了分离,减少了并发下的冲突。

ConcurrentHashMap不能put null 值,key和value都不能为null。一个合理的解释是无法分辨get()等操作返回的null值是表示值为null还是不存在该对象,这在多线程环境下容易引起误解。

除此之外,和HashMap的实现基本一模一样,当有修改操作时借助了synchronized来对table[i]进行锁定保证了线程安全以及使用了CAS来保证原子性操作。


private transient volatile int sizeCtl;
transient volatile Node<K,V>[] table;

private transient volatile int sizeCtl; :sizeCtl是控制标识符,不同的值表示不同的意义。


负数代表正在进行初始化或扩容操作 ,其中-1代表正在初始化或扩容 ,-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。


transient volatile Node<K,V>[] table;:是一个容器数组,第一次插入数据的时候初始化,大小是2的幂次方。这就是我们所说的底层结构:”数组+链表(或树)”


TreeBin用于封装维护TreeNode,包含putTreeVal、lookRoot、UNlookRoot、remove、balanceInsetion、balanceDeletion等方法,当链表转树时,用于封装TreeNode,也就是说,ConcurrentHashMap的红黑树存放的时TreeBin,而不是treeNode。


1、 transient volatile Node<K,V>[] table;是一个容器数组,第一次插入数据的时候初始化,大小是2的幂次方。这就是我们所说的底层结构:”数组+链表(或树)”

2、private static final int MAXIMUM_CAPACITY = 1 << 30; // 最大容量

3、private static final intDEFAULT_CAPACITY = 16;

4、static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // MAX_VALUE=2^31-1=2147483647

5、private static finalint DEFAULT_CONCURRENCY_LEVEL = 16;

6、private static final float LOAD_FACTOR = 0.75f;

7、static final int TREEIFY_THRESHOLD = 8; // 链表转树的阀值,如果table[i]下面的链表长度大于8时就转化为数

8、static final int UNTREEIFY_THRESHOLD = 6; //树转链表的阀值,小于等于6是转为链表,仅在扩容tranfer时才可能树转链表

9、static final int MIN_TREEIFY_CAPACITY = 64;

10、private static final int MIN_TRANSFER_STRIDE = 16;

11、private static int RESIZE_STAMP_BITS = 16;

12、private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // help resize的最大线程数

13、private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

14、static final int MOVED = -1; // hash for forwarding nodes(forwarding nodes的hash值)、标示位

15、static final int TREEBIN = -2; // hash for roots of trees(树根节点的hash值)

16、static final int RESERVED = -3; // hash for transient reservations(ReservationNode的hash值)

table初始化的长度(如果不指定默认情况下为16)。

这里要说一个参数:concurrencyLevel,表示能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数。默认值为16,(即允许16个线程并发可能不会产生竞争)。为了保证并发的性能,我们要很好的估计出concurrencyLevel值,不然要么竞争相当厉害,从而导致线程试图写入当前锁定的段时阻塞。

putVal方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode()); //扰动函数同HashMap相同,但又增加了一次与操作
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) { // 类似死循环,直到插入成功再跳出,因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); // 还未初始化则初始化 
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                /*
                    i=(n-1)&hash 等价于i=hash%n(前提是n为2的幂次方).根据hash值,取出table中位置的节点用f表示。
                    有如下两种情况:
                    1、如果table[i]==null(即该位置的节点为空,没有发生碰撞),则利用CAS操作直接存储在该位置,
                        如果CAS操作成功则退出死循环。
                    
                */
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//2、如果table[i]!=null(即该位置已经有其它节点,发生碰撞),检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
                tab = helpTransfer(tab, f);
            else { //运行到这里,说明table[i]的节点的hash值不等于MOVED。
                V oldVal = null;
                synchronized (f) { // 锁定,(hash值相同的链表的头节点)
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1; // 令binCount为1,在遍历过程中记录该值变化以检查是否需要变为树
                            for (Node<K,V> e = f;; ++binCount) {  
                            /*
                            下面的代码就是先查找链表中是否出现了此key,如果出现,则更新value,并跳出循环,
                            否则将节点加入到链表末尾并跳出循环
                            */
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //插入成功后,如果插入的是链表节点,则要判断下该桶位是否要转化为树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal; // 如果插入的值只是更新了旧值,在这里就return了,不会执行addCount
                    break;
                }
            }
        }
        addCount(1L, binCount);  // 如果插入的是一个新节点,则执行addCount()方法尝试更新元素个数
        return null;
    }


putVal(K key, V value, boolean onlyIfAbsent)方法做的工作如下:
1、检查key/value是否为空,如果为空,则抛异常,否则进行2
2、进入for死循环,进行3
3、检查table是否初始化了,如果没有,则调用initTable()进行初始化然后进行 2,否则进行4
4、根据key的hash值计算出其应该在table中储存的位置i,取出table[i]的节点用f表示。
    根据f的不同有如下三种情况:

1)如果table[i]==null(即该位置的节点为空,没有发生碰撞),
则利用CAS操作直接存储在该位置,如果CAS操作成功则退出死循环。
 2)如果table[i]!=null(即该位置已经有其它节点,发生碰撞),碰撞处理也有两种情况
 2.1)检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
 2.2)说明table[i]的节点的hash值不等于MOVED,如果table[i]为链表节点,则将此节点插入链表中即可(也有可能是更新value值)
 如果table[i]为树节点,则将此节点插入树中即可。插入成功后,进行 5
5、如果table[i]的节点是链表节点,则检查table的第i个位置的链表是否需要转化为数,如果需要则调用treeifyBin函数进行转化

6、如果插入的是新节点,还会执行addCount()方法


  • 获取table中对应索引的元素f。
    Doug Lea采用Unsafe.getObjectVolatile来获取,也许有人质疑,直接table[index]不可以么,为什么要这么复杂?
    在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
  • 如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node节点。
    • 如果CAS成功,说明Node节点已经插入,随后addCount(1L, binCount)方法会检查当前容量是否需要进行扩容。
    • 如果CAS失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。

(http://blog.csdn.net/u010412719/article/details/52145145)

get方法

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)//如果eh=-1就说明e节点为ForWordingNode,这说明什么,说明这个节点已经不存在了,被另一个线程正则扩容
//所以要查找key对应的值的话,直接到新newtable找
                return (p = e.find(h, key)) != null ? p.val : null;            
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

这个get请求,我们需要cas来保证变量的原子性。如果tab[i]正被锁住,那么CAS就会失败,失败之后就会不断的重试。这也保证了get在高并发情况下不会出错。
我们来分析下到底有多少种情况会导致get在并发的情况下可能取不到值。

1、一个线程在get的时候,另一个线程在对同一个key的node进行remove操作;2、一个线程在get的时候,另一个线程正则重排table。可能导致旧table取不到值。

那么本质是,我在get的时候,有其他线程在对同一桶的链表或树进行修改。那么get是怎么保证同步性的呢?我们看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt到底是干嘛的:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

它是对tab[i]进行原子性的读取,因为我们知道putVal等对table的桶操作是有加锁的,那么一般情况下我们对桶的读也是要加锁的,但是我们这边为什么不需要加锁呢?因为我们用了Unsafe的getObjectVolatile,因为table是volatile类型,所以对tab[i]的原子请求也是可见的。因为如果同步正确的情况下,根据happens-before原则,对volatile域的写入操作happens-before于每一个后续对同一域的读操作。所以不管其他线程对table链表或树的修改,都对get读取可见。


table扩容

当table容量不足的时候,即table的元素数量达到容量阈值sizeCtl,需要对table进行扩容。
整个扩容分为两部分:

  1. 构建一个nextTable,大小为table的两倍。
  2. 把table的数据复制到nextTable中。

这两个过程在单线程下实现很简单,但是ConcurrentHashMap是支持并发插入的,扩容操作自然也会有并发的出现,这种情况下,第二步可以支持节点的并发复制,这样性能自然提升不少,但实现的复杂度也上升了一个台阶。

先看第一步,构建nextTable,毫无疑问,这个过程只能只有单个线程进行nextTable的初始化,具体实现如下:

private final void addCount(long x, int check) {
    ... 省略部分代码
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}


通过Unsafe.compareAndSwapInt修改sizeCtl值,保证只有一个线程能够初始化nextTable,扩容后的数组长度为原来的两倍,但是容量是原来的1.5。

节点从table移动到nextTable,大体思想是遍历、复制的过程。

  1. 首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素f,初始化一个forwardNode实例fwd。
  2. 如果f == null,则在table中的i位置放入fwd,这个过程是采用Unsafe.compareAndSwapObjectf方法实现的,很巧妙的实现了节点的并发移动。
  3. 如果f是链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上,移动完成,采用Unsafe.putObjectVolatile方法给table原位置赋值fwd。
  4. 如果f是TreeBin节点,也做一个反序处理,并判断是否需要untreeify,把处理的结果分别放在nextTable的i和i+n的位置上,移动完成,同样采用Unsafe.putObjectVolatile方法给table原位置赋值fwd。

遍历过所有的节点以后就完成了复制工作,把table指向nextTable,并更新sizeCtl为新数组大小的0.75倍 ,扩容完成。

(http://www.jianshu.com/p/f6730d5784ad  扩容实现具体步骤)

size实现

1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或删除数据时(putVal方法),会通过addCount()方法更新baseCount,实现如下:

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }

1、初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;

2、如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,实现如下:


else if (cellsBusy == 0 && counterCells == as &&
         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    boolean init = false;
    try {                           // Initialize table
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}
3、如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象;

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
    break;
所以在1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}


总结:没有并发(更准确地说应该是没人和它竞争)的时候直接更新basecount,有并发的话,更新countercells中的值。countercells中的元素是ConterCell类,数组中的下标和具体的线程有关。








在jdk1.8中主要做了2方面的改进

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。





版权声明:本文为qqqqq1993qqqqq原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。