jdk1.8 ConcurrentHashMap学习 2 addCount 的第一部分 之 fullAddCount
addCount() 就是ConcurrentHashMap put进去一个元素后,执行的增加size的操作,因为ConcurrentHashMap是能在并发环境下保证线程安全的,所以肯定不会是简单的++操作。
那先看看size()方法
可以发现,size的组成是 baseCount属性 加上CounterCell数组里面的所有值的和
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
//再看看 sumCount()
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
再看下这个counterCells ,CounterCell是一个静态内部类,就一个volatile long value;
很明显是可以进行cas操作的,通过这两点也能猜到大概实现了。
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
/**
* Table of counter cells. When non-null, size is a power of 2.
* 计数器单元表。当非空时,大小是2的乘方。
*/
private transient volatile CounterCell[] counterCells;
接下来看这个addCount()方法第一部分,如下
总结: addCount先判断CounterCell数组是否为空:
1、如果为空则对当前map对象cas操作 baseCount 加 1,cas成功了就跳过if,失败了就执行fullAddCount方法。
2、如果不为空则通过当前线程的hash值找到此线程在CounterCell数组中对应的位置,如果此位置的CounterCell对象为空,就执行fullAddCount方法。如果不为空就对此CounterCell对象cas操作value加1。如果成功return;失败就执行fullAddCount方法。
我问: 为什么不直接for循环对当前map对象cas操作 baseCount 加 1,却要引入CounterCell数组
我一个朋友说: 因为for循环cas这种方式可以解决多线程并发问题,但因为cas的是当前map对象,所以同一时刻还是只有一个线程能cas成功,而对于引入CounterCell数组,cas的是当前线程对应在数组中特定位置的元素,也就是说如果位置不冲突,n个长度的CounterCell数组是可以支持n个线程同时cas成功的。
//这是在put方法里的调用 binCount都是大于等于0的
addCount(1L, binCount);
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//首先if判断counterCells为空并且对当前map对象cas操作baseCount + x成功,就跳过if里的操作,
//因为都cas操作baseCount + x成功了,就不需要通过counterCells辅助了,简单明了。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//如果上面判断失败了,counterCells不为空 或者counterCells为空但cas失败了。
//如果counterCells为空,直接执行fullAddCount。
//如果不为空,判断当前线程在counterCells中的槽位是否为空,如果不为空,
//对槽位中的CounterCell对象cas操作value加1,成功return,失败执行fullAddCount,如果槽位为空,直接执行fullAddCount
if (as == null || (m = as.length - 1) < 0 ||
//ThreadLocalRandom.getProbe()就相当于是 [当前线程的哈希值]
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
//cas对CounterCell对象中的value执行+x(也就是+1)操作
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//下面是检查是否扩容(先不看)
if (check >= 0) {
//。。。。。
}
接下来看看fullAddCount
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//如果当前线程hash值==0 就执行下,具体目的还不清楚
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//循环
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//如果counterCells已经被初始化了
if ((as = counterCells) != null && (n = as.length) > 0) {
//如果当前线程对应于counterCell数组中的槽位为空,在此位置添加一个CounterCell元素
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//wasUncontended一直为true
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//如果当前线程对应槽位已经存在CounterCell元素了,就对value+x
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
//为扩容做条件
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
//扩展数组,长度变为两倍
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
//如果counterCells 没有被初始化,
//(由上面可知cellsBusy是用来在初始化和赋值扩容时做判断的)
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
//初始化长度为2
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//如果都不满足 最后还是cas当前map对象 baseCount + x
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}