为何用ConcurrentHashMap
在并发编程里,HashMap是线程不安全的,如果使用HashMap的话,那么可能会造成死循环,原因是HashMap里的Entry链表在多线程操作下,会变成循环链表,那么一旦get的话,那么next节点永远不为空,就会出现死循环了。
而HashTable的操作效率又过低,因为它是使用synchronized对整个HashTable加锁,所有线程都是在竞争同一把锁。当一个线程访问HashTable的同步方法,其他线程如果也访问它的同步方法时会陷入阻塞或轮询状态。如一个线程用put添加元素时,那么另一个线程既不能put,也不能get获取元素。所以在竞争越激烈的情况下,效率越低。使用Collections.synchronizedMap是使用了装饰器模式,在get、put和其他操作外都包装了一层synchronized锁,与HashTable差不多,所以效率也不高。
总的来说,就是在多线程场景下,使用HashMap会造成死循环,而使用HashTable的效率又过于低下。所以就有了ConcurrentHashMap,在JDK1.7中,ConcurrentHashMap使用了锁分段技术,即把数据分散放到多个桶中,每一个桶配一把锁,当多线程访问不同桶的数据时,就不会出现锁竞争的情况,即当一个线程访问一个桶的数据,其他桶的数据也能被其他线程访问。从而大大提高了并发效率。
内部存储结构
ConcurrentHashMap的存储结构如下,它是由一个Segment数组组成,而每个Segment里又有一个HashEntry链表数组,所以说每个Segment也类似于HashMap,也就是说它能进行扩容。当想对HashEntry数组里的数据修改时,必须先获得对应的Segment的锁。
Segment的个数一旦在初始化后,就不能再改变。Segment个数也可以说是并发级别,默认是16个,就是说ConcurrentHashMap默认最多支持16个线程并发。Segment的个数也总是2的n次方,这样设计也是类似于HashMap,为了在hash计算(取模)时做优化。
看一下HashEntry的结构
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
//其他省略
}
源码分析
初始化
ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel几个参数,来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组的。segmentShift和segmentMask在决定数据要放入哪个segment中会用到。
接下来看构造函数的内部逻辑。
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 参数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 校验并发级别大小,大于 1<<16,重置为 65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 用来保存2的n次方的n
int sshift = 0;
// 2的n次方
int ssize = 1;
// 这个循环可以找到大于等于 concurrencyLevel 的最近的2的n次方值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 记录段偏移量
this.segmentShift = 32 - sshift;
// 记录段掩码
this.segmentMask = ssize - 1;
// 设置容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c = 总容量 / segment个数 ,默认 16 / 16 = 1,这里是计算每个 Segment 中的HashEntry数组的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//Segment 中的HashEntry数组的容量必须是2的n次方,类似于HashMap
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创建 Segment 数组,设置 segments[0]
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//将segments[0]放到ss中去
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
总结一下初始化流程,分为以下步骤:
- 参数检验及并发级别大小校验
- 计算大于等于concurrencyLevel的2的幂次方,作为segment数组容量大小
- 记录segmentShift和segmentMask,后面在定位segment索引时会用到
- 初始化segment[0],放入segment
put方法
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//对元素的key进行一次再散列,以减少散列冲突,使元素能够比较均匀的分布在不同的segment中,提高存取效率
int hash = hash(key);
// hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算
// 其实也就是把高4位与segmentMask(1111)做与运算
// 得到要放入的segment索引
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 如果查找到的 Segment 为空,初始化该位置的segment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
/**
* Returns the segment for the given index, creating it and
* recording in segment table (via CAS) if not already present.
*
* @param k the index
* @return the segment
*/
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 判断 u 位置的 Segment 是否为null,因为其他线程可能初始化了
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
// 获取0号 segment 里的 HashEntry<K,V> 初始化长度
int cap = proto.table.length;
// 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的
float lf = proto.loadFactor;
// 计算扩容阀值
int threshold = (int)(cap * lf);
// 创建一个 cap 容量的 HashEntry 数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 自旋检查 u 位置的 Segment 是否为null
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
// 使用CAS 赋值,只会成功一次
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
总结一下put流程:
使用segmentShift和segmentMask计算元素要放入的segment的索引,获取该segment
若该segment为空,则调用ensureSegment对该位置的segment初始化
初始化segment流程:
1.检查计算该位置的 segment 是否为null
2.为null则使用 segmen[0] 的参数(负载因子、容量)创建一个HashEntry数组
3.再次检查计算该位置的 segment 是否为null
4.为null则使用 segmen[0] 的参数和HashEntry数组初始化新的segment
5.自旋判断该位置的segment是否为null,使用CAS将segment放入数组的对应位置调用该segment的 put 插入 key,value 值。
那segment的put方法有哪些步骤呢?
使用 tryLock() 获取 ReentrantLock 独占锁;如果获取不到,则用 scanAndLockForPut 获取
计算元素要放入的segment的索引index,然后获取该segment的HashEntry
判断HashEntry是否为空,若为空:
- 判断当前容量是否大于扩容阈值及小于最大容量,符合条件则进行扩容
- 使用头插法插入新节点
若HashEntry不为空:
- 遍历HashEntry链表,判断是否有节点的key和hash与要插入的key和hash一致,若一致,则替换该节点的值
- 若遍历完发现链表里没有相同的,那么判断是否需要扩容,然后使用头插法插入新节点
这里面的第一步中的 scanAndLockForPut 这个方法做的操作就是不断的自旋 tryLock() 获取锁。当自旋次数大于指定次数时,使用 lock() 阻塞获取锁。在自旋时顺便新建要put的 HashEntry 节点。
扩容rehash
在扩容时会创建一个容量是原来2倍的新数组,然后把原数组里的元素rehash后移动到新数组中去,在元素在新数组中的位置只会是 原位置 或者 原位置+oldCap。HashEntry链表节点会使用头插法插入到指定位置。同时,ConcurrentHashMap只会对某个segment进行扩容,不会对整个容器扩容。还有一点会比HashMap稍好,就是ConcurrentHashMap是在 插入元素前 判断是否需要扩容,HashMap是在 插入元素后 判断是否需要扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
get方法
- 计算key所在的segment的索引
- 遍历该segment中的HashEntry链表,寻找key和hash相同的元素
- 找到就返回该元素的value值。否则返回null
get方法高效之处在于不需要加锁,因为它把get方法中要使用的共享变量都定义成volatile类型。而volatile类型的变量可在多线程之间保持内存的可见性,能被多线程同时读,并且不会读到过期的值。由于get操作只需要读,所以不用加锁。
不会读到过期的值是因为JMM的happens-before原则,对于volatile变量,写操作总是先于读操作,即使两个线程同时修改和读取,也能保证get能拿到最新的值。经典的volatile替换锁的场景。
Reference
- 末读代码
- 《Java并发编程的艺术》