jdk1.8 ConcurrentHashMap学习 2 addCount fullAddCount

jdk1.8 ConcurrentHashMap学习 2 addCount 的第一部分 之 fullAddCount

addCount() 就是ConcurrentHashMap put进去一个元素后,执行的增加size的操作,因为ConcurrentHashMap是能在并发环境下保证线程安全的,所以肯定不会是简单的++操作。

那先看看size()方法
可以发现,size的组成是 baseCount属性 加上CounterCell数组里面的所有值的和

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
   
//再看看 sumCount()
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

再看下这个counterCells ,CounterCell是一个静态内部类,就一个volatile long value;
很明显是可以进行cas操作的,通过这两点也能猜到大概实现了。

@sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
 /**
   * Table of counter cells. When non-null, size is a power of 2.
   * 计数器单元表。当非空时,大小是2的乘方。
  */
private transient volatile CounterCell[] counterCells;

接下来看这个addCount()方法第一部分,如下
总结: addCount先判断CounterCell数组是否为空:
1、如果为空则对当前map对象cas操作 baseCount 加 1,cas成功了就跳过if,失败了就执行fullAddCount方法。
2、如果不为空则通过当前线程的hash值找到此线程在CounterCell数组中对应的位置,如果此位置的CounterCell对象为空,就执行fullAddCount方法。如果不为空就对此CounterCell对象cas操作value加1。如果成功return;失败就执行fullAddCount方法。

我问: 为什么不直接for循环对当前map对象cas操作 baseCount 加 1,却要引入CounterCell数组

我一个朋友说: 因为for循环cas这种方式可以解决多线程并发问题,但因为cas的是当前map对象,所以同一时刻还是只有一个线程能cas成功,而对于引入CounterCell数组,cas的是当前线程对应在数组中特定位置的元素,也就是说如果位置不冲突,n个长度的CounterCell数组是可以支持n个线程同时cas成功的。

//这是在put方法里的调用 binCount都是大于等于0的
addCount(1L, binCount);

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //首先if判断counterCells为空并且对当前map对象cas操作baseCount + x成功,就跳过if里的操作,
        //因为都cas操作baseCount + x成功了,就不需要通过counterCells辅助了,简单明了。
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //如果上面判断失败了,counterCells不为空 或者counterCells为空但cas失败了。
            //如果counterCells为空,直接执行fullAddCount。
            //如果不为空,判断当前线程在counterCells中的槽位是否为空,如果不为空,
            //对槽位中的CounterCell对象cas操作value加1,成功return,失败执行fullAddCount,如果槽位为空,直接执行fullAddCount
            if (as == null || (m = as.length - 1) < 0 ||
                //ThreadLocalRandom.getProbe()就相当于是 [当前线程的哈希值]
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  //cas对CounterCell对象中的value执行+x(也就是+1)操作
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //下面是检查是否扩容(先不看)
        if (check >= 0) {
        //。。。。。
    }

接下来看看fullAddCount

private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        //如果当前线程hash值==0 就执行下,具体目的还不清楚
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        //循环
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //如果counterCells已经被初始化了
            if ((as = counterCells) != null && (n = as.length) > 0) {
                //如果当前线程对应于counterCell数组中的槽位为空,在此位置添加一个CounterCell元素
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //wasUncontended一直为true
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //如果当前线程对应槽位已经存在CounterCell元素了,就对value+x
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                //为扩容做条件 
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            //扩展数组,长度变为两倍
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //如果counterCells 没有被初始化,
            //(由上面可知cellsBusy是用来在初始化和赋值扩容时做判断的) 
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        //初始化长度为2
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //如果都不满足 最后还是cas当前map对象 baseCount + x
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }
综上,总结:
addCount : 通过属性baseCount 和 counterCell数组中所有的元素的和来记录size
在无竞争的情况下,通过cas当前map对象的baseCount为baseCount + 1,
有竞争情况下,上诉cas失败,会初始化一个长度为2的CounterCell数组,数组会扩容,每次扩容成两倍,每个线程有在counterCell数组中对应的位置(多个线程可能会对应同一个位置), 如果位置上的CounterCell元素为空,就生成一个value为1的元素,如果不为空,则cas当前CounterCell元素的value为value + 1;如果都失败尝试cas当前map对象的baseCount为baseCount + 1,

版权声明:本文为qq_40225004原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。