ConcurrentHashMap如何保证并发安全

ConcurrentHashMap简介:一款线程安全的HashMap类集合,底层数据结构采用Node数组 + 链表 /红黑树。java1.7采用Segement(基于ReetrantLock实现)分段锁解决并发安全,java1.8采用CAS + synchronized解决并发安全;java1.8采用CAS + synchronized是为了提高并发性能,CAS保证在需要放入数组元素且数组对应下标元素为null情况下无阻塞尝试放入元素,当需要放入数组元素且对应下标不为null时采用synconized只对首节点加锁,锁粒度相比1.7更小,只要hash不冲突就不会加锁,而且java1.6开始synconized进行了锁优化,并发效率进一步提升。

1.ConcurrentHashMap中线程安全的方法:

①putVal()方法放入一个元素

②replaceNode()方法

③clear()方法

④initTable()方法

⑤addCount()方法

重点是putVal()、initTable()和addCount()方法

putVal()方法源码:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //ConcurrentHashMap要求key和value均不为空
        if (key == null || value == null) throw new NullPointerException();
        //通过spread()方法获取key的hash
        int hash = spread(key.hashCode());
        //记录数组某一个下标包含的元素个数(以便最后进行树化的判断)
        int binCount = 0;
        //自旋加锁
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //判断Node数组是否需要初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //(n - 1) & hash得到数组下标,tabAt()方法获取此下标元素,判断是否为空
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //该下标元素位空,尝试使用CAS放入元素
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //MOVED = -1,代表此时有线程在对ConcurrentHashMap进行扩容,
            //helpTranfer()帮助扩容
            else if ((fh = f.hash) == MOVED)
                //扩容成功返回最新的Node数组
                tab = helpTransfer(tab, f);
            //元素不为空时,对首节点加synchronized锁
            else {
                V oldVal = null;
                //加锁操作
                synchronized (f) {
                    //加锁后首先判断该下标元素是否变化,因为有可能在加锁过程中有其他线程修改了                                    
                    //这个结点
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历链表,尾部放入元素
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //出现相同key时直接覆盖value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //遍历到链表尾,进行newNode操作
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //首结点为树节点时,putTreeVal放入元素
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //记录该位置元素个数,判断是否需要将链表转红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //元素个数加1
        addCount(1L, binCount);
        return null;
    }

addCount()方法源码:

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //先对baseCount进行+1操作,这里采用CAS修改,修改失败会尝试CounterCell数组的某个位置+1
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //baseCount加锁失败,会获取一个随机hash,映射到CounterCell的某个下标,再利用
            //CAS修改数值
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            //得到新的元素总数
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

initTable()方法:

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //自旋初始化数组,sizeCtl()代表获取当前初始化状态(小于0代表此时有线程正在进行初始化操作)
        while ((tab = table) == null || tab.length == 0) {
            //有其他线程初始化时就让出CPU的使用权,避免一直进行自旋消耗CPU资源
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //当还没有线程在进行初始化时,利用CAS修改sizeCtl,自己进行初始化操作
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //数组初始化
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //n >>> 2 等价于n的四分之一,所以sc代表扩容触发的阈值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //更新sizeCtl
                    sizeCtl = sc;
                }
                break;
            }
        }
        //返回初始化后的数组
        return tab;
    }


版权声明:本文为qq_48649411原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。