ConcurrentHashMap并发容器(java 8)

一、ConcurrentHashMap

1、ConcurrentHashMap和HashMap一样都是一个哈希表,但是使用了完全不同的所策略;

​2、ConcurrentHashMap可以提供很好的并发性和可伸缩性。

​3、ConcurrentHashMap使用了一个更加细化的锁机制,为分离锁(锁分段技术),这个机制允许更深层次的共享。

  • 对于JDK1.7版本的实现, ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的Hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行,问题就在于查询遍历链表效率太低。

  • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)。

  • JAVA7之前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操作时,将该Segment锁定,不允许对其进行非查询操作,而在JAVA8之后,底层依然由数组+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如 TreeBin、Traverser等对象内部类,并采用CAS无锁算法(来自ReentrantLock中的公平锁),这种乐观操作在完成前进行判断,如果符合预期结果才给予执行,对并发操作提供良好的优化。

 static class Segment<K,V> extends ReentrantLock implements Serializable {
     private static final long serialVersionUID = 2249069246763182397L;
     final float loadFactor;
     Segment(float lf) { this.loadFactor = lf; }
 }
  private void writeObject(java.io.ObjectOutputStream s)
          throws java.io.IOException {
          // For serialization compatibility
          // Emulate segment calculation from previous version of this class
          int sshift = 0;
          int ssize = 1;
          while (ssize < DEFAULT_CONCURRENCY_LEVEL) {
              ++sshift;
              ssize <<= 1;
          }
          int segmentShift = 32 - sshift;
          int segmentMask = ssize - 1;
          @SuppressWarnings("unchecked")
          Segment<K,V>[] segments = (Segment<K,V>[])
              new Segment<?,?>[DEFAULT_CONCURRENCY_LEVEL];
          for (int i = 0; i < segments.length; ++i)
              segments[i] = new Segment<K,V>(LOAD_FACTOR);
          s.putFields().put("segments", segments);
          s.putFields().put("segmentShift", segmentShift);
          s.putFields().put("segmentMask", segmentMask);
          s.writeFields();
  
      	//对象节点,锁的粒度就是HashEntry
          Node<K,V>[] t;
          if ((t = table) != null) {
              Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
              for (Node<K,V> p; (p = it.advance()) != null; ) {
                  s.writeObject(p.key);
                  s.writeObject(p.val);
              }
          }
          s.writeObject(null);
          s.writeObject(null);
          segments = null; // throw away
      }

二、ConcurrentHashMap源码解读分析(java8)

  • ​ Node节点
//键值输入。 
//此类永远不会作为用户可变的Map.Entry导出(即,一个支持setValue;请参阅下面的MapEntry),
//但可以用于批量任务中使用的只读遍历。 
//具有负哈希字段的节点的子类是特殊的,并且包含空键和值(但永远不会导出)。 
//否则,键和val永远不会为空。
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    //保证可见性
    volatile V val;
    //保证可见性
    volatile Node<K,V> next;

    //node节点元素
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;//哈希值
        this.key = key;//map.key
        this.val = val;//map.value
        this.next = next;//下一个节点
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    //重写hashcode
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }

    /**
     * 对map.get()的虚拟化支持; 在子类中重写。
     */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}
  • ​ put方法
  // 全局变量table
  transient volatile Node<K,V>[] table;
  
  public V put(K key, V value) {
      return putVal(key, value, false);
  }
  
  final V putVal(K key, V value, boolean onlyIfAbsent) {
      //判断key或者value为空的话就抛出空指针异常
      if (key == null || value == null) throw new NullPointerException();
      //计算hash值
      int hash = spread(key.hashCode());
      int binCount = 0;
      //遍历Node节点table
      for (Node<K,V>[] tab = table;;) {
          Node<K,V> f; 
          int n, i, fh;
          //如果table为空或者table为0的话需要初始化一个table
          if (tab == null || (n = tab.length) == 0)
              tab = initTable();
          //如果f为当前key定位出来的Node,如果为空表示当前位置可以写入数据
          else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
              //利用 CAS 尝试写入,失败则自旋保证成功
              //casTabAt方法调用了sun.misc.Unsafe.compareAndSwapObject方法写入数据
              if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                  break;                   // no lock when adding to empty bin
          }
          // 如果当前为为止的hashcode == MOVED == -1,则需要进行扩容。
          else if ((fh = f.hash) == MOVED)
              //扩容方法 resizeStamp(... ...)
              tab = helpTransfer(tab, f);
          else {
              V oldVal = null;
              //如果都不满足,则利用 synchronized 锁写入数据。
              //结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点
              synchronized (f) {
                  if (tabAt(tab, i) == f) {
                      //如果fh>=0说明这个节点是一个链表节点,不是树节点
                      if (fh >= 0) {
                          binCount = 1;
                          //遍历所有的节点
                          for (Node<K,V> e = f;; ++binCount) {
                              K ek;
                              //如果hash值和key和当前节点的hash和key相同则修改value值
                              if (e.hash == hash &&
                                  ((ek = e.key) == key ||
                                   (ek != null && key.equals(ek)))) {
                                  oldVal = e.val;
                                  if (!onlyIfAbsent)
                                      e.val = value;
                                  break;
                              }
                              Node<K,V> pred = e;
                              //如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
                              if ((e = e.next) == null) {
                                  pred.next = new Node<K,V>(hash, key, value, null);
                                  break;
                              }
                          }
                      }
                      //如果这个节点是树节点,那么就找按照树节点方式插入
                      else if (f instanceof TreeBin) {
                          Node<K,V> p;
                          binCount = 2;
                          if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                              oldVal = p.val;
                              if (!onlyIfAbsent)
                                  p.val = value;
                          }
                      }
                  }
              }
              if (binCount != 0) {
                  //如果链表长度已经达到临界值8 就需要把链表转换为树结构。
                  //如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。
                  if (binCount >= TREEIFY_THRESHOLD)
                      treeifyBin(tab, i);
                  if (oldVal != null)
                      return oldVal;
                  break;
              }
          }
      }
      //将当前ConcurrentHashMap的元素数量+1
      addCount(1L, binCount);
      return null;
  }
  • ​ get方法
  public V get(Object key) {
      Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
      //计算hash值
      int h = spread(key.hashCode());
      //根据计算的hash值确定节点的位置
      if ((tab = table) != null && (n = tab.length) > 0 &&
          (e = tabAt(tab, (n - 1) & h)) != null) {
          //如果找到的节点的key和传入的key相等且不为空的话,直接返回当前节点
          if ((eh = e.hash) == h) {
              if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                  return e.val;
          }
          //如果eh<0说明这个节点在树上 直接去树上查找
          else if (eh < 0)
              return (p = e.find(h, key)) != null ? p.val : null;
          //否则遍历链表 找到对应的值并返回  
          while ((e = e.next) != null) {
              if (e.hash == h &&
                  ((ek = e.key) == key || (ek != null && key.equals(ek))))
                  return e.val;
          }
      }
      return null;
  }
  • remove方法
  // 全局变量table
  transient volatile Node<K,V>[] table;
  
  public V remove(Object key) {
      return replaceNode(key, null, null);
  }
  final V replaceNode(Object key, V value, Object cv) {
      //计算hash值
      int hash = spread(key.hashCode());
      //遍历table
      //为什么Node<K,V>[] tab = table??
      //因为table是volatile变量,读写volatile变量的开销很大。
      //编译器也不能对volatile变量的读写做任何优化,
      //直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。
      for (Node<K,V>[] tab = table;;) {
          Node<K,V> f; int n, i, fh;
          //如果table为空直接,或者长度为0,制定hash值为0
          if (tab == null || (n = tab.length) == 0 ||
              (f = tabAt(tab, i = (n - 1) & hash)) == null)
              break;
          //是一个forwardNode
          else if ((fh = f.hash) == MOVED)
              tab = helpTransfer(tab, f);
          else {
              V oldVal = null;
              boolean validated = false;
              synchronized (f) {
                  if (tabAt(tab, i) == f) {
                      if (fh >= 0) {
                          validated = true;
                          // 循环寻找
                          for (Node<K,V> e = f, pred = null;;) {
                              K ek;
                              // equal 相同 取出
                              if (e.hash == hash &&
                                  ((ek = e.key) == key ||
                                   (ek != null && key.equals(ek)))) {
                                  V ev = e.val;
                                  // value为null或value和查到的值相等  
                                  if (cv == null || cv == ev ||
                                      (ev != null && cv.equals(ev))) {
                                      oldVal = ev;
                                      if (value != null)
                                          e.val = value;
                                      else if (pred != null)
                                          pred.next = e.next;
                                      else
                                          setTabAt(tab, i, e.next);
                                  }
                                  break;
                              }
                              pred = e;
                              if ((e = e.next) == null)
                                  break;
                          }
                      }
                      // 若是树 红黑树高效查找/删除
                      else if (f instanceof TreeBin) {
                          validated = true;
                          TreeBin<K,V> t = (TreeBin<K,V>)f;
                          TreeNode<K,V> r, p;
                          if ((r = t.root) != null &&
                              (p = r.findTreeNode(hash, key, null)) != null) {
                              V pv = p.val;
                              if (cv == null || cv == pv ||
                                  (pv != null && cv.equals(pv))) {
                                  oldVal = pv;
                                  if (value != null)
                                      p.val = value;
                                  else if (t.removeTreeNode(p))
                                      setTabAt(tab, i, untreeify(t.first));
                              }
                          }
                      }
                  }
              }
              //要将count的值减一
              //这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。
              if (validated) {
                  if (oldVal != null) {
                      if (value == null)
                          addCount(-1L, -1);
                      return oldVal;
                  }
                  break;
              }
          }
      }
      return null;
  }
  

版权声明:本文为captian_900331原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。