JAVA架构师之路
本系列文章均是博主原创,意在记录学习上的知识,同时一起分享学习心得。
第一章
第一节 Java集合总结(架构师之路 )
第二节 Java多线程(架构师之路 )
第三节 理解synchronized(架构师之路 )
第四节 多线程协作机制
第五节 线程的中断
第六节 并发包的基石
第七节 并发容器
文章目录
前言
本章,我们探讨Java并发包中的容器类,具体包括:
- 写时复制的List和Set;
- ConcurrentHashMap;
- 基于SkipList的Map和Set;
- 各种并发队列
一、写时复制的List和Set
1.1 CopyOnWriteArrayList
CopyOnWriteArrayList实现了List接口,它的基本用法是跟其他List(如ArrayList)是一样的。其特点如下:
- 它是线程安全的,可以被多个线程并发访问;
- 它的迭代器不支持修改操作,但也不会抛出ConcurrentModificationException;
- 它以原子方式支持一些复合操作。
CopyOnWriteArrayList内部也是一个数组,但这个数组是以原子方式被整体更新的。每次修改操作,但会新建一个数组,复制原数组的内容到新数组,在新数组上进行需要的修改,然后以原子方式设置内部的数组引用,这就是写时复制。
内部数组声明
private volatile transient Object[] array;
在CopyOnWriteArrayList中,读不需要锁,可以并行,读写也可以并行,但多个线程不能同时写,每个写操作需要获取锁,CopyOnWriteArrayList内部使用了ReentrantLock。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
由于写时复制的局限性,CopyOnWriteArrayList不适应与数组很大且修改频繁的场景,它是以优化读操作为目标,读不需要同步,性能很高,但在优化读的同时牺牲了写性能。
1.2 CopyOnWriteArraySet
CopyOnWriteArraySet实现了Set接口,不包含重复元素。CopyOnWriteArraySet内部是通过CopyOnWriteArrayList实现的,其成员声明为:
private final CopyOnWriteArrayList a1;
在构造方法中被初始化,如:
public CopyOnWriteArraySet() {
a1 = new CopyOnWriteArrayList();
}
其add方法代码为:
public boolean add(E e) {
return a1.addIfAbsent(e);
}
add方法就是调用CopyOnWriteArrayList的addIfAbsent方法。
contains方法代码为:
public boolean contains(Object obj) {
return a1.contains(obj);
}
由于CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的,所以与之前介绍过的Set的实现类如HashSet/TreeSet相比,他的性能比较低,不适应于元素个数特别多的集合。如果元素个数比较多,可以考虑ConcurrentHashMap或ConcurrentSkipListSet这两个类。
二、ConcurrentHashMap
ConcurrentHashMap是HashMap的并发版本,与HashMap相比,它有如下特点:
- 并发安全;
- 直接支持一些原子复合操作;
- 支持高并发,读操作完全并行,写操作支持一定程度的并行;
- 与同步容器Collection.synchronizedMap相比,迭代不用加锁,不会抛出ConcurrentModificationException;
- 弱一致性;
2.1 并发安全
HashMap不是并发安全的,在并发更新的情况下,HashMap可能出现死循环,占满CPU。
代码如下(示例):
public static void unsafeUpdate() {
final Map<Integer, Integer> map = new HashMap<Integer, Integer>();
for (int i =0; i < 1000; i++) {
Thread t = new Thread(){
Random random = new Random();
@Override
public void run() {
for (int i=0; i < 1000;i++) {
map.put(random.nextInt(), 1);
}
}
};
t.start();
}
}
死循环出现在多个线程同时扩容哈希表的时候,不是同时更新一个链表的时候,那种情况可能会出现更新丢失,但不会死循环。
2.2 原子复合操作
ConcurrentHashMap除了实现Map接口,还实现了ConcurrentMap,接口实现了一些条件更新操作,Java7中的具体定义为:
public interface ConcurrentMap<K, V> extends Map<K, V> {
// 条件更新,如果map中没有key,设置key为value,返回原来key对应的值
// 如果没有返回null
V putifAbsent(K key, V value);
// 条件删除,如果map中有key,且对应的值为oldValue,则删除,如果删除了,返回true
// 否则返回false
boolean remove(Object key, Object value);
// 条件替换,如果Map中有key, 且对应的值为oldValue,则替换为newValue
// 如果替换了,返回true,否则false
boolean replace(K key, V oldValue, V newValue);
// 条件替换,如果Map中有key,则替换值为value,返回原来key对应的值
// 如果原来没有,返回null
V replace(K key, V value);
}
2.3 高并发的机制
ConcurrentHashMap是为高并发设计的,其核心思想主要有两点:
- 分段锁;
- 读不需要锁;
同步容器使用synchronized,所有方法竞争同一个锁,而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。
在对每个段的数据进行读写时,ConcurrentHashMap也不是简单地使用锁进行同步,内部使用CAS。对应写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写得时候也可以读,这使得ConcurrentHashMap的并行度远高于同步容器。
2.4 迭代安全
使用同步容器,在迭代中需要加锁,否则可能会抛出ConcurrentModificationException。ConcurrentHashMap没有这个问题,在迭代器创建后,在迭代过程中,如果另一个线程对容器进行修改,迭代会继续,不会抛出异常。
2.5 弱一致性
ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性。
三、基于跳表的Map和Set
我们都知道TreeSet是基于TreeMap实现的,与此类似,ConcurrentSkipListSet也是基于ConcurrentSkipListMap实现的,本节主要介绍ConcurrentSkipListMap。
3.1 基本概念
ConcurrentSkipListMap是基于SkipList实现的,SkipList称为跳跃表或跳表,是一种数据结构,它更易于实现高效的并发算法。
ConcurrentSkipListMap有如下特点:
- 没有使用锁,所有操作都是无阻塞的,所有操作都可以并行,包括写,多线程可以同时写。
- 与ConcurrentHashMap类似,迭代器不会抛出ConcurrentModifitionException,是弱一致性的,迭代可能反映最新修改也可能不反应,一些方法如putAll、clear不是原子的。
- 与ConcurrentHashMap类似,同样实现了ConcurrentMap接口,支持一些原子复合操作。
- 与TreeMap一样,可排序,默认按键的自然顺序,也可以传递比较器自定义排序,实现了SorttedMap和NavigableMap接口。
代码示例:
public static void main(String[] args) {
Map<String, String> map = new ConcurrentSkipListMap<String, String>(Collections.<String>reverseOrder());
map.put("a", "abstract");
map.put("c", "call");
map.put("b", "ball");
System.out.println(map);
}

四、并发队列
Java并发包提供了丰富的队列类,可以简单分为以下几种。
- 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque。
- 普通阻塞队列:基于数组的ArrayBolckingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque。
- 优先级阻塞队列:PriorityBlockingQueue。
- 延时阻塞队列:DelayQueue。
- 其他阻塞队列:SynchronousQueue和LinkedTransferQueue。
无锁非阻塞是指,这些队列不使用锁,所有操作总是可以立即执行,主要通过循环CAS实现并发安全。阻塞队列是指,这些队列使用锁和条件,很多操作都需要先获取锁或满足特定条件,获取不到锁或等待条件时,会等待,获取到锁或条件满足再返回。
4.1 无锁非阻塞并发队列
有两个无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque,它们适用于多个线程并发使用一个队列的场合,都是基于链表实现的,都没有限制大小,是无界的,与ConcurrentSkipListMap类似,它们的size方法不是一个常量运算。
ConcurrentLinkedQueue实现了Queue接口,表示一个先进先出的队列,从尾部入队,从头部出队,内部是一个单向链表。ConcurrentLinkedDeque实现了Deque接口,表示一个双端队列,在两端都可以入队和出队,内部是一个双向链表。
4.2 普通阻塞队列
除了刚介绍完的两个队列,其他队列都是阻塞队列,都实现了BlockingQueue,在入队/出队时可能等待,主要方法有:
// 入队,如果队列满,等待直到队列有空间
void put(E e) throws InterruptedException;
// 出队,如果队列空,等待直到队列不为空,返回头部元素
E take() throws InterruptedException;
// 入队,如果队列满,最多等待指定的时间,如果超时还是满,返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 出队,如果队列空,最多等待指定的时间,如果超时还是空,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
ArrayBlockingQueue和LinkedBlockingQueue都实现了Queue接口,表示先进先出的队列,尾部进,头部出,而LinkedBlockingDeque实现了Deque接口,是一个双端队列。
ArrayBlockingQueue是基于循环数组实现的,有界,创建时需要指定大小,且在运行过程中不会改变,这与我们在容器类中介绍的ArrayDeque是不同的,ArrayDeque也是基于循环数组实现的,但是是无界的,会自动扩展。
LinkedBlockingQueue是基于单向链表实现的,在创建时可以指定最大长度,也可以不指定,默认是无限的,节点是动态创建的。LinkedBlockingDeque与LinkedBlockingQueue一样,最大长度也是创建时可选的,默认无限,不过它是基于双向链表实现的。
内部,它们都是使用了显示锁ReentrantLock和Condition实现的。
4.3 优先级阻塞队列
普通阻塞队列是先进先出,而优先级队列是按优先级出队的,优先级高德先出,我们在容器类中介绍过优先级队列PriorityQueue及其背后的数据结构堆。PriorityBlockingQueue是PriorityQueue的并发版本,与PriorityQueue一样,它没有大小限制,是无界的,内部的数组大小会动态扩展,要求元素要么实现Comparable接口,要么创建PriorityBlockingQueue时提供一个Comparator对象。
与PriorityQueue的区别是,PriorityBlockingQueue实现了BlockingQueue接口,在队列为空时,take方法会阻塞等待。另外,PriorityBlockingQueue是线程安全的,它的基本实现原理和PriorityQueue是一样的,也是基于堆,但它使用了一个锁ReentrantLock保护所有访问,使用一个条件协调阻塞等待。
4.4 延时阻塞队列
延时阻塞队列DelayQueue是一种特殊的优先级队列,它是无界的。它要求每个元素都实现Delayed接口,该接口的声明为:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed扩展了Comparable接口,也就是说,DelayQueue的每个元素都是可比较的,它有一个额外方法getDelay返回一个给定时间单位unit的整数,表示再延迟多长时间,如果小于等于0,则表示不再延迟。
DelayQueue可以用于实现定时任务,它按元素的延时时间出队。它的特殊之处在于只有当元素的延时过期之后才能被从队列中拿走,也就是说,take方法总是返回第一个过期的元素,如果没有,则阻塞等待。
DelayQueue是基于PriorityQueue实现的,它使用一个锁ReentrantLock保护所有访问,使用一个条件available表示头部是否有元素,当头部元素的延时未到时,take操作会根据延时计算需睡眠的时间,然后睡眠,如果在此过程中有新的元素入队,且成为头部元素,则阻塞睡眠的线程会被提前唤醒然后重新检查。
4.5 其他阻塞队列
Java并发包中还有两个特殊的阻塞队列:SynchronousQueue和LinkedTransferQueue。
SynchronousQueue与一般队列不同,它不算是一种真正的队列,没有存储元素的空间,连存储一个元素的空间都没有。它的入队操作要等待另一个线程的出队操作,反之亦然。如果没有其他线程在等待从队列中接收元素,put操作就会等待。take操作需要等待其他线程往队列中放元素,如果没有,也会等待。SynchronousQueue适用于两个线程之间直接传递信息、事件或任务。
LinkedTransferQueue实现了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增加了一些额外功能,生产者在往队列中放元素时,可以等待消费者接收后再返回,适用于一些消息传递类型的应用中。TransferQueue的接口定义为:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果有消费者在等待(执行take或限时的poll),直接转给消费者
// 返回true,否则返回false,不入队
boolean tryTransfer(E e);
// 如果有消费者在等待,直接转给消费者,否则入队,阻塞等待直到被消费者接收后再返回
void transfer(E e) throws InterruptedException;
// 如果有消费者在等待,直接转给消费者,返回 true
// 否则入队,阻塞等待限定的时间,如果最后被消费者接收,返回true
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 是否有消费者在等待
boolean hasWaitingConsumer();
// 等待的消费者个数
int getWaitingConsumerCount();
}
LinkedTransferQueue是基于链表实现的,无界的TransferQueue,具体实现比较复杂。