一、ConcurrentHashMap
1、ConcurrentHashMap和HashMap一样都是一个哈希表,但是使用了完全不同的所策略;
2、ConcurrentHashMap可以提供很好的并发性和可伸缩性。
3、ConcurrentHashMap使用了一个更加细化的锁机制,为分离锁(锁分段技术),这个机制允许更深层次的共享。
对于JDK1.7版本的实现, ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的Hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行,问题就在于查询遍历链表效率太低。
JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)。
JAVA7之前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操作时,将该Segment锁定,不允许对其进行非查询操作,而在JAVA8之后,底层依然由数组+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如 TreeBin、Traverser等对象内部类,并采用CAS无锁算法(来自ReentrantLock中的公平锁),这种乐观操作在完成前进行判断,如果符合预期结果才给予执行,对并发操作提供良好的优化。
static class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
final float loadFactor;
Segment(float lf) { this.loadFactor = lf; }
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
// For serialization compatibility
// Emulate segment calculation from previous version of this class
int sshift = 0;
int ssize = 1;
while (ssize < DEFAULT_CONCURRENCY_LEVEL) {
++sshift;
ssize <<= 1;
}
int segmentShift = 32 - sshift;
int segmentMask = ssize - 1;
@SuppressWarnings("unchecked")
Segment<K,V>[] segments = (Segment<K,V>[])
new Segment<?,?>[DEFAULT_CONCURRENCY_LEVEL];
for (int i = 0; i < segments.length; ++i)
segments[i] = new Segment<K,V>(LOAD_FACTOR);
s.putFields().put("segments", segments);
s.putFields().put("segmentShift", segmentShift);
s.putFields().put("segmentMask", segmentMask);
s.writeFields();
//对象节点,锁的粒度就是HashEntry
Node<K,V>[] t;
if ((t = table) != null) {
Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
for (Node<K,V> p; (p = it.advance()) != null; ) {
s.writeObject(p.key);
s.writeObject(p.val);
}
}
s.writeObject(null);
s.writeObject(null);
segments = null; // throw away
}
二、ConcurrentHashMap源码解读分析(java8)
- Node节点
//键值输入。
//此类永远不会作为用户可变的Map.Entry导出(即,一个支持setValue;请参阅下面的MapEntry),
//但可以用于批量任务中使用的只读遍历。
//具有负哈希字段的节点的子类是特殊的,并且包含空键和值(但永远不会导出)。
//否则,键和val永远不会为空。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//保证可见性
volatile V val;
//保证可见性
volatile Node<K,V> next;
//node节点元素
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;//哈希值
this.key = key;//map.key
this.val = val;//map.value
this.next = next;//下一个节点
}
public final K getKey() { return key; }
public final V getValue() { return val; }
//重写hashcode
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* 对map.get()的虚拟化支持; 在子类中重写。
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
- put方法
// 全局变量table
transient volatile Node<K,V>[] table;
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//判断key或者value为空的话就抛出空指针异常
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//遍历Node节点table
for (Node<K,V>[] tab = table;;) {
Node<K,V> f;
int n, i, fh;
//如果table为空或者table为0的话需要初始化一个table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果f为当前key定位出来的Node,如果为空表示当前位置可以写入数据
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//利用 CAS 尝试写入,失败则自旋保证成功
//casTabAt方法调用了sun.misc.Unsafe.compareAndSwapObject方法写入数据
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果当前为为止的hashcode == MOVED == -1,则需要进行扩容。
else if ((fh = f.hash) == MOVED)
//扩容方法 resizeStamp(... ...)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果都不满足,则利用 synchronized 锁写入数据。
//结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
//如果fh>=0说明这个节点是一个链表节点,不是树节点
if (fh >= 0) {
binCount = 1;
//遍历所有的节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值和key和当前节点的hash和key相同则修改value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
//如果这个节点是树节点,那么就找按照树节点方式插入
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度已经达到临界值8 就需要把链表转换为树结构。
//如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的元素数量+1
addCount(1L, binCount);
return null;
}
- get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//根据计算的hash值确定节点的位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果找到的节点的key和传入的key相等且不为空的话,直接返回当前节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果eh<0说明这个节点在树上 直接去树上查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- remove方法
// 全局变量table
transient volatile Node<K,V>[] table;
public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
//计算hash值
int hash = spread(key.hashCode());
//遍历table
//为什么Node<K,V>[] tab = table??
//因为table是volatile变量,读写volatile变量的开销很大。
//编译器也不能对volatile变量的读写做任何优化,
//直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table为空直接,或者长度为0,制定hash值为0
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
//是一个forwardNode
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
// 循环寻找
for (Node<K,V> e = f, pred = null;;) {
K ek;
// equal 相同 取出
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
// value为null或value和查到的值相等
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
// 若是树 红黑树高效查找/删除
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
//要将count的值减一
//这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
版权声明:本文为captian_900331原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。