ConcurrenHashmap
线程安全的集合
- Hashtable
- ConcurrentHashMap
- Vector
ConcurrentHashMap是HashMap的线程安全版本,ConcurrentSkipListMap是TreeMap的线程安全版本
Hashtable是JDK 5之前Map唯一线程安全的内置实现(Collections.synchronizedMap不算)。Hashtable继承的是Dictionary(Hashtable是其唯一公开的子类),并不继承AbstractMap或者HashMap。尽管Hashtable和HashMap的结构非常类似,但是他们之间并没有多大联系。
为什么要使用ConcurrentHashMap
因为在并发编程中使用HashMap可能导致程序死循环,而HashTable效率低下。
1)HashMap是非线程安全的
在多线程的操作下、若调用HashMap.put()能引起死循环、导致CPU资源消耗大。
多线程操作put()会使得Entry链表形成环形数据结构,使得Entry的next节点永远不为空,就会产生死循环获取Entry。
2)HashTable的效率低
hashTable是用Synchronized来保证线程安全的,所有的线程都必须竞争一把锁、效率太低。
如线程A访问HashTable同步方法put()、B线程也访问同步方法put()、但B没有锁进入阻塞或轮询状态,也不能访问其他的方法,导致线程越多效率越低。
ConcurrentHashMap分析 jdk1.7
属性
int DEFAULT_INITIAL_CAPACITY = 16; //初始容量的默认值-》hashentry的table属性
float DEFAULT_LOAD_FACTOR = 0.75f; //默认的加载因子 -》HashEntry
int DEFAULT_CONCURRENCY_LEVEL = 16; //默认的并发度-》segment数组大小
int MAXIMUM_CAPACITY = 1 << 30; //最大的容量 -》segment中数组的最大值 ->tab.length<MAXIMUM_CAPACITY
int MIN_SEGMENT_TABLE_CAPACITY = 2; //HashEntry数组的最小值
int MAX_SEGMENTS = 1 << 16; // slightly conservative //最大并发度=segment数组的最大值
int RETRIES_BEFORE_LOCK = 2; //锁重试次数
final int segmentMask; //主要作用于keyhash过程
final int segmentShift;
final Segment<K,V>[] segments; //存储数据节点的位置
//存放数据的锁:
static final class Segment<K,V> extends ReentrantLock { //Segment是一种可重入锁(ReentrantLock),扮演锁的角色;
static final int MAX_SCAN_RETRIES =Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //segment获取锁重试次数上限
transient volatile HashEntry<K,V>[] table; //存放数据的
transient int count;
}
//HashEntry的属性
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
对HashEntry的value、next属性加上volatile,这样保证了可见性、
默认的ConcurrentHashMap中是有16个类似HashMap的结构、每个HashMap拥有一个独占锁。也就是说最终的效果就是通过某种Hash算法,将任何一个元素均匀的映射到某个HashMap的Map.Entry上面,而对某个一个元素的操作就集中在其分布的HashMap上,与其它HashMap无关。这样就支持最多16个并发的写操作。
构造函数


public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {}
//数组大小 加载因子 并发度,默认16个
segments数组长度ssize和concurrentLevel有关,ssize>=concurrentLevel的最小2次幂。
如果觉得这么描述不好理解,那么举个例子就清楚了。比如concurrentLevel=17,2的几次幂刚好>=17呢?是32对吧!所以ssize=32。更重要的是,Segment的数组大小之所以一定是2的次幂,就是为了方便通过按位与的散列算法来定位Segment的index位置
put()
通过加锁机制插入数据
1、定位数据应该放在哪个Segment中
通过hash算法定位到对应的Segment,若获取到的Segment为空,则调用ensureSegment()方法;
否则,直接调用查询到的Segment的put方法插入值.
ensureSegment()方法,使用getObjectVolatile()读取对应Segment,如果还是为空,则以segments[0]为原型创建一个Segment对象,并将这个对象设置为对应的Segment值并返回。
if (value == null) throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask; //找到key对应的segment数组的索引位置
// segmentShift = 32 - sshift;
// segmentMask = ssize - 1; //segmentMask -段掩码、确保散列的均匀性、其中size是segmentMask数组的长度
// 2^sshift==ssize 若zize=16 sshift=4
if ((s = (Segment <K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) s = ensureSegment(j);
return s.put(key, hash, value, false);
2、定位数据应该放在Segment中的哪个数组下标下面.
3、将数据放入对应位置.
(1线程A执行tryLock()方法成功,则把HashEntry对象插入到相应的位置;
HashEntry <K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
(2线程B获取锁失败,则执行scanAndLockForPut()方法,在scanAndLockForPut()方法中,会通过重复执行tryLock()方法尝试获取锁, 在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程B;
(3当线程A执行完插入操作时,会通过unlock()方法释放锁,接着唤醒线程B继续执行;
get()
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
- hash(key)对key的hashcode重新散列 int h = hash(key);
- 定位Segment
- 定位HashEntry
- 通过getObjectVolatile()方法获取指定偏移量上的HashEntry
- 通过循环遍历链表获取对应值
在jdk1.7中ConcurrentHashMap的get操作并没有加锁,因为在每个Segment中定义的HashEntry数组和在每个HashEntry中定义的value和next HashEntry节点都是volatile类型的,volatile类型的变量可以保证其在多线程之间的可见性,因此可以被多个线程同时读
transient volatile HashEntry<K,V>[] table;
static final class HashEntry<K,V> {
volatile V value;
volatile HashEntry<K,V> next;
}
size
因为ConcurrentHashMap是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个Segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个Segment的元素个数时,已经计算过的Segment同时可能有数据的插入或则删除,在1.7的实现中,采用了如下方式:
先采用不加锁的方式,连续计算元素的个数,最多计算3次:
1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
2、如果前后两次计算结果都不同,则给每个 Segment 进行加锁,再计算一次元素的个数;
分段锁是什么??
将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
如concurrenthashmap每个segment拥有一把锁、每次线程在操作时只会锁住其中的一个Segment,不同的线程只要操作不同的Segment中的数据,线程之间就互不干扰
ConcurrentHashMap中get方法不加锁,所以多个线程读不会互斥.
操作不同数据只要数据不是同一个Segment中的就不互斥.
读操作对加锁的需求:
(1)HashEntry 中的 key,hash,next 都声明为 final 型:
这意味着,不能把节点添加到链接的中间和尾部,也不能在链接的中间和尾部删除节点。
这个特性可以保证:在访问某个节点时,这个节点之后的链接不会被改变。这个特性可以大大降低处理链表时的复杂性;
(2)HashEntry 类的 value 域被声明为 Volatile 型:
Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。
在 ConcurrentHashMap 中,不允许用 null 作为键和值,当读线程读到某个 HashEntry 的 value 域的值为 null 时,便知道产生了冲突(发生了重排序现象),需要加锁后重新读入这个 value 值。这些特性互相配合,使得读线程即使在不加锁状态下,也能正确访问 ConcurrentHashMap;由于对 Volatile 变量的写入操作将与随后对这个变量的读操作进行同步。当一个写线程修改了某个 HashEntry 的 value 域后,另一个读线程读这个值域,
Java 内存模型能够保证读线程读取的一定是更新后的值。所以,写线程对链表的非结构性修改能够被后续不加锁的读线程看到;
高并发性
主要来自于三个方面:
(1).用分段锁(Segment锁)实现多个线程间的更深层次的共享访问;
(2).用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求;
(3).通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性;
ConcurrentHashMap分析 jdk1.8
相对于jdk1.7取消了segment分段锁、加入了红黑树的数据结构、对于锁变为了对Node节点(数组的元素)加锁。
采用Node + CAS + Synchronized来保证并发安全
put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//初始化数组
if (tab == null || (n = tab.length) == 0) tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果相应位置的Node还未初始化,则通过CAS插入相应的数据
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
//扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁,
//如果该节点的hash不小于0,则遍历链表更新节点或插入新节点;
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果该节点是TreeBin类型的节点,说明是红黑树结构,
//则通过putTreeVal方法往红黑树中插入节点;
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) p.val = value;
}
}
}
}
//如果binCount不为0,说明put操作对数据产生了影响
// 如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树
//如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值;
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//如果插入的是一个新节点,则执行addCount()方法尝试更新元素个数baseConunt
addCount(1L, binCount);
return null;
}
put流程步骤:
1、若是第一次put、需要初始化数组、调用initTable()方法。
2、如果没有hash冲突就直接CAS插入
3、如果在moved状态、进行扩容
3、若添加元素对应的node不为null、且不是moved状态、对该节点加synchronized锁
如果该节点的hash>=0,则遍历链表更新节点或插入新节点;
4、如果节点是红黑树类型、使用putTreeVal()方法
最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
5、如果添加成功,则执行addCount()方法尝试更新元素个数、统计size,并且检查扩容
get()
1、计算hash值,定位到该table索引位置,如果是首节点符合就返回
2、如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
3、以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null
size
对于size的计算,在扩容和addCount()方法就已经有处理了,而JDK1.7是在调用size()方法才去计算,其实在并发集合中去计算size是没有多大的意义的,因为size是实时在变的,只能计算某一刻的大小,但是某一刻太快了,人的感知是一个时间段,所以并不是很精确
volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount
private transient volatile long baseCount;
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a; //变化的数量
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
区别
1、jdk1.7是ReentrantLock+Segment+HashEntry,而jdk1.8中是使用了synchronized+CAS+HashEntry+红黑树。
2、jdk1.8锁的粒度是HashEntry、jdk1.7锁的粒度是segment、包含多个HashEntry、因此jdk1.8中降低了锁的粒度
3、jdk1.8中使用synchronized进行同步的、因此不需要分段锁
4、jdk1.8使用了红黑树来优化链表、当阈值大于8链表转换为红黑树、红黑树遍历效率高于链表
JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock??
1、因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
2、synchronized之前一直都是重量级的锁,但是后来java官方是对他进行过升级的,他现在采用的是锁升级的方式去做的。
针对 synchronized 获取锁的方式,JVM 使用了锁升级的优化方式,就是先使用偏向锁优先同一线程然后再次获取锁,如果失败,就升级为 CAS 轻量级锁,如果失败就会短暂自旋,防止线程被系统挂起。最后如果以上都失败就升级为重量级锁。
所以是一步步升级上去的,最初也是通过很多轻量级的方式锁定的。