并发容器

JAVA架构师之路

本系列文章均是博主原创,意在记录学习上的知识,同时一起分享学习心得。
第一章
第一节 Java集合总结(架构师之路 )
第二节 Java多线程(架构师之路 )
第三节 理解synchronized(架构师之路 )
第四节 多线程协作机制
第五节 线程的中断
第六节 并发包的基石
第七节 并发容器


前言

本章,我们探讨Java并发包中的容器类,具体包括:

  • 写时复制的List和Set;
  • ConcurrentHashMap;
  • 基于SkipList的Map和Set;
  • 各种并发队列

一、写时复制的List和Set

1.1 CopyOnWriteArrayList

CopyOnWriteArrayList实现了List接口,它的基本用法是跟其他List(如ArrayList)是一样的。其特点如下:

  • 它是线程安全的,可以被多个线程并发访问;
  • 它的迭代器不支持修改操作,但也不会抛出ConcurrentModificationException;
  • 它以原子方式支持一些复合操作。

CopyOnWriteArrayList内部也是一个数组,但这个数组是以原子方式被整体更新的。每次修改操作,但会新建一个数组,复制原数组的内容到新数组,在新数组上进行需要的修改,然后以原子方式设置内部的数组引用,这就是写时复制。
内部数组声明

private volatile transient Object[] array;

在CopyOnWriteArrayList中,读不需要锁,可以并行,读写也可以并行,但多个线程不能同时写,每个写操作需要获取锁,CopyOnWriteArrayList内部使用了ReentrantLock。

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

由于写时复制的局限性,CopyOnWriteArrayList不适应与数组很大且修改频繁的场景,它是以优化读操作为目标,读不需要同步,性能很高,但在优化读的同时牺牲了写性能。

1.2 CopyOnWriteArraySet

CopyOnWriteArraySet实现了Set接口,不包含重复元素。CopyOnWriteArraySet内部是通过CopyOnWriteArrayList实现的,其成员声明为:
private final CopyOnWriteArrayList a1;
在构造方法中被初始化,如:

public CopyOnWriteArraySet() {
	a1 = new CopyOnWriteArrayList();
}

其add方法代码为:

public boolean add(E e) {
	return a1.addIfAbsent(e);
}

add方法就是调用CopyOnWriteArrayList的addIfAbsent方法。
contains方法代码为:

public boolean contains(Object obj) {
	return a1.contains(obj);
}

由于CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的,所以与之前介绍过的Set的实现类如HashSet/TreeSet相比,他的性能比较低,不适应于元素个数特别多的集合。如果元素个数比较多,可以考虑ConcurrentHashMap或ConcurrentSkipListSet这两个类。

二、ConcurrentHashMap

ConcurrentHashMap是HashMap的并发版本,与HashMap相比,它有如下特点:

  • 并发安全;
  • 直接支持一些原子复合操作;
  • 支持高并发,读操作完全并行,写操作支持一定程度的并行;
  • 与同步容器Collection.synchronizedMap相比,迭代不用加锁,不会抛出ConcurrentModificationException;
  • 弱一致性;

2.1 并发安全

HashMap不是并发安全的,在并发更新的情况下,HashMap可能出现死循环,占满CPU。

代码如下(示例):

public static void unsafeUpdate() {
    final Map<Integer, Integer> map = new HashMap<Integer, Integer>();
    for (int i =0; i < 1000; i++) {
        Thread t = new Thread(){
            Random random = new Random();
            @Override
            public void run() {
                for (int i=0; i < 1000;i++) {
                    map.put(random.nextInt(), 1);
                }
            }
        };
        t.start();
    }
}

死循环出现在多个线程同时扩容哈希表的时候,不是同时更新一个链表的时候,那种情况可能会出现更新丢失,但不会死循环。

2.2 原子复合操作

ConcurrentHashMap除了实现Map接口,还实现了ConcurrentMap,接口实现了一些条件更新操作,Java7中的具体定义为:

public interface ConcurrentMap<K, V> extends Map<K, V> {
	// 条件更新,如果map中没有key,设置key为value,返回原来key对应的值
	// 如果没有返回null
	V putifAbsent(K key, V value);
	// 条件删除,如果map中有key,且对应的值为oldValue,则删除,如果删除了,返回true
	// 否则返回false
   boolean remove(Object key, Object value);
	// 条件替换,如果Map中有key, 且对应的值为oldValue,则替换为newValue
	// 如果替换了,返回true,否则false
   boolean replace(K key, V oldValue, V newValue);
	// 条件替换,如果Map中有key,则替换值为value,返回原来key对应的值
	// 如果原来没有,返回null
   V replace(K key, V value);
}

2.3 高并发的机制

ConcurrentHashMap是为高并发设计的,其核心思想主要有两点:

  • 分段锁;
  • 读不需要锁;

同步容器使用synchronized,所有方法竞争同一个锁,而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。
在对每个段的数据进行读写时,ConcurrentHashMap也不是简单地使用锁进行同步,内部使用CAS。对应写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写得时候也可以读,这使得ConcurrentHashMap的并行度远高于同步容器。

2.4 迭代安全

使用同步容器,在迭代中需要加锁,否则可能会抛出ConcurrentModificationException。ConcurrentHashMap没有这个问题,在迭代器创建后,在迭代过程中,如果另一个线程对容器进行修改,迭代会继续,不会抛出异常。

2.5 弱一致性

ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性。

三、基于跳表的Map和Set

我们都知道TreeSet是基于TreeMap实现的,与此类似,ConcurrentSkipListSet也是基于ConcurrentSkipListMap实现的,本节主要介绍ConcurrentSkipListMap。

3.1 基本概念

ConcurrentSkipListMap是基于SkipList实现的,SkipList称为跳跃表或跳表,是一种数据结构,它更易于实现高效的并发算法。
ConcurrentSkipListMap有如下特点:

  1. 没有使用锁,所有操作都是无阻塞的,所有操作都可以并行,包括写,多线程可以同时写。
  2. 与ConcurrentHashMap类似,迭代器不会抛出ConcurrentModifitionException,是弱一致性的,迭代可能反映最新修改也可能不反应,一些方法如putAll、clear不是原子的。
  3. 与ConcurrentHashMap类似,同样实现了ConcurrentMap接口,支持一些原子复合操作。
  4. 与TreeMap一样,可排序,默认按键的自然顺序,也可以传递比较器自定义排序,实现了SorttedMap和NavigableMap接口。

代码示例:

public static void main(String[] args) {
    Map<String, String> map = new ConcurrentSkipListMap<String, String>(Collections.<String>reverseOrder());
    map.put("a", "abstract");
    map.put("c", "call");
    map.put("b", "ball");
    System.out.println(map);
}

在这里插入图片描述

四、并发队列

Java并发包提供了丰富的队列类,可以简单分为以下几种。

  • 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque。
  • 普通阻塞队列:基于数组的ArrayBolckingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque。
  • 优先级阻塞队列:PriorityBlockingQueue。
  • 延时阻塞队列:DelayQueue。
  • 其他阻塞队列:SynchronousQueue和LinkedTransferQueue。

无锁非阻塞是指,这些队列不使用锁,所有操作总是可以立即执行,主要通过循环CAS实现并发安全。阻塞队列是指,这些队列使用锁和条件,很多操作都需要先获取锁或满足特定条件,获取不到锁或等待条件时,会等待,获取到锁或条件满足再返回。

4.1 无锁非阻塞并发队列

有两个无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque,它们适用于多个线程并发使用一个队列的场合,都是基于链表实现的,都没有限制大小,是无界的,与ConcurrentSkipListMap类似,它们的size方法不是一个常量运算。
ConcurrentLinkedQueue实现了Queue接口,表示一个先进先出的队列,从尾部入队,从头部出队,内部是一个单向链表。ConcurrentLinkedDeque实现了Deque接口,表示一个双端队列,在两端都可以入队和出队,内部是一个双向链表。

4.2 普通阻塞队列

除了刚介绍完的两个队列,其他队列都是阻塞队列,都实现了BlockingQueue,在入队/出队时可能等待,主要方法有:

// 入队,如果队列满,等待直到队列有空间
void put(E e) throws InterruptedException;
// 出队,如果队列空,等待直到队列不为空,返回头部元素
E take() throws InterruptedException;
// 入队,如果队列满,最多等待指定的时间,如果超时还是满,返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 出队,如果队列空,最多等待指定的时间,如果超时还是空,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;

ArrayBlockingQueue和LinkedBlockingQueue都实现了Queue接口,表示先进先出的队列,尾部进,头部出,而LinkedBlockingDeque实现了Deque接口,是一个双端队列。
ArrayBlockingQueue是基于循环数组实现的,有界,创建时需要指定大小,且在运行过程中不会改变,这与我们在容器类中介绍的ArrayDeque是不同的,ArrayDeque也是基于循环数组实现的,但是是无界的,会自动扩展。
LinkedBlockingQueue是基于单向链表实现的,在创建时可以指定最大长度,也可以不指定,默认是无限的,节点是动态创建的。LinkedBlockingDeque与LinkedBlockingQueue一样,最大长度也是创建时可选的,默认无限,不过它是基于双向链表实现的。
内部,它们都是使用了显示锁ReentrantLock和Condition实现的。

4.3 优先级阻塞队列

普通阻塞队列是先进先出,而优先级队列是按优先级出队的,优先级高德先出,我们在容器类中介绍过优先级队列PriorityQueue及其背后的数据结构堆。PriorityBlockingQueue是PriorityQueue的并发版本,与PriorityQueue一样,它没有大小限制,是无界的,内部的数组大小会动态扩展,要求元素要么实现Comparable接口,要么创建PriorityBlockingQueue时提供一个Comparator对象。
与PriorityQueue的区别是,PriorityBlockingQueue实现了BlockingQueue接口,在队列为空时,take方法会阻塞等待。另外,PriorityBlockingQueue是线程安全的,它的基本实现原理和PriorityQueue是一样的,也是基于堆,但它使用了一个锁ReentrantLock保护所有访问,使用一个条件协调阻塞等待。

4.4 延时阻塞队列

延时阻塞队列DelayQueue是一种特殊的优先级队列,它是无界的。它要求每个元素都实现Delayed接口,该接口的声明为:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

Delayed扩展了Comparable接口,也就是说,DelayQueue的每个元素都是可比较的,它有一个额外方法getDelay返回一个给定时间单位unit的整数,表示再延迟多长时间,如果小于等于0,则表示不再延迟。

DelayQueue可以用于实现定时任务,它按元素的延时时间出队。它的特殊之处在于只有当元素的延时过期之后才能被从队列中拿走,也就是说,take方法总是返回第一个过期的元素,如果没有,则阻塞等待。

DelayQueue是基于PriorityQueue实现的,它使用一个锁ReentrantLock保护所有访问,使用一个条件available表示头部是否有元素,当头部元素的延时未到时,take操作会根据延时计算需睡眠的时间,然后睡眠,如果在此过程中有新的元素入队,且成为头部元素,则阻塞睡眠的线程会被提前唤醒然后重新检查。

4.5 其他阻塞队列

Java并发包中还有两个特殊的阻塞队列:SynchronousQueue和LinkedTransferQueue。
SynchronousQueue与一般队列不同,它不算是一种真正的队列,没有存储元素的空间,连存储一个元素的空间都没有。它的入队操作要等待另一个线程的出队操作,反之亦然。如果没有其他线程在等待从队列中接收元素,put操作就会等待。take操作需要等待其他线程往队列中放元素,如果没有,也会等待。SynchronousQueue适用于两个线程之间直接传递信息、事件或任务。

LinkedTransferQueue实现了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增加了一些额外功能,生产者在往队列中放元素时,可以等待消费者接收后再返回,适用于一些消息传递类型的应用中。TransferQueue的接口定义为:

public interface TransferQueue<E> extends BlockingQueue<E> {
    // 如果有消费者在等待(执行take或限时的poll),直接转给消费者
    // 返回true,否则返回false,不入队
    boolean tryTransfer(E e);
    // 如果有消费者在等待,直接转给消费者,否则入队,阻塞等待直到被消费者接收后再返回
    void transfer(E e) throws InterruptedException;
    // 如果有消费者在等待,直接转给消费者,返回 true
    // 否则入队,阻塞等待限定的时间,如果最后被消费者接收,返回true
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    // 是否有消费者在等待
    boolean hasWaitingConsumer();
    // 等待的消费者个数
    int getWaitingConsumerCount();
}

LinkedTransferQueue是基于链表实现的,无界的TransferQueue,具体实现比较复杂。



版权声明:本文为qq_23986927原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。