Java并发编程实战读书笔记——第五章 基础构建模块

5.1 同步容器类

同步容器类包括Vector和Hashtable,JDK1.2添加了一些功能相似的类,这些同步的封装器都是由Collections.synchronizedXxx等工厂方法创建的。这些类实现线程安全的方式是:将它们状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

5.1.1 同步容器类的问题

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。容器上常见的复合操作包括:迭代(反复访问元素,直到遍历容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算(例如若没有则添加)。这些复合操作在没有客户端加锁的情况下仍然是线程安全的,但当其他线程并发地修改容器时,它们可能会表现出意料之外的行为。

getLast和deleteLast,都会执行先检查再运行操作。getLast可能抛出ArrayIndexOutOfBoundException异常。

客户端加锁,通过获得容器类的锁,使之成为原子操作。

迭代方法也可以通过在客户端加锁来解决不可行迭代的问题,但要牺牲一些伸缩性。

5.1.2 迭代器与ConcurrentModificationException

无论是直接迭代还是使用for-each循环语法,对容器类进行迭代的标准方式都是使用Iterator。然而,如果有其他线程并发地修改容器,那么即使使用迭代器也无法避免在迭代期间对容器加锁。在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是"及时失败"fail-fast的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。

及时失败的迭代器只是捕获并发错误,因此只能作为问题的预警指示器。它们采用的实现方式是,将计数器的变化与容器关联起来:如果在迭代期间计数器被修改,那么hasNext或next将抛出ConcurrentModificationException。然而,这种检查是在没有同步的情况下进行的,因此可能会看到失效的计数值,而迭代器可能并没有意识到已经发生了修改。这是一种设计上的权衡,从而降低并发修改操作的检测代码对程序性能带来的影响。

使用for-each循环语法对List容器进行迭代。从内部来看,java将生成使用Iterator的个代码,反复调用hasNext和next来迭代List对象。与Vector一样,要想避免出现ConcurrentModificationException,就必须在迭代过程持有容器的锁。

然而,有时候开发人员不希望在迭代期间对容器加锁。当容器的规模很大的时候,或者在每个元素上执行操作的时候很长,那么这样线程等待的时间会很长。而且可能会产生死锁。即使不存在饥饿或者死锁的风险,长时间地对容器加锁也会降低程序的可伸缩性。持有锁的时间越长,那么在锁上的竞争就越激烈,如果等待线程较多,那么将极大地降低吞量和CPU的利用率。

另一种替代方案是克隆容器,并在副本上进行迭代。由于副本被封闭在线程内,就避免了ConcurrentModificationExcepiton(克隆过程中仍然需要对容器加锁)。在克隆过程中存在显著的性能开销。这种方式的好坏取决于多个因素,包括容器的大小,在每个元素上执行的工作,迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量方面的需求。

在单线程代码中也可能抛出ConcurrentModifyException异常。当对象直接从容器中删除而不是通过Iterator.remove来删除时,就会抛出这个异常。

5.1.3 隐藏迭代器

加黑的方法会将字符串连接操作转换为调用StringBuilder.append(Object),而这个方法又会调用容器的toString()方法,标准容器的toString方法将迭代容器,并在每个元素上调用toString来生成容器内容的格式化表示。

这个方法会抛出ConcurrentModifycationException,真正的问题是HidderIterator不是线程安全的。如果HiddenIterator用synchronizedSet来包装HashSet,并且对同步代码进行封装,那么就不会发生这种错误。

正如封装对象的状态有助于维持不变性条件一样,封闭的同步机制同样有助于确保实施同步策略。

容器的hashCode和equals等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况。同样,containsAll、removeAll和retainAll等方法,以及把容器作为参数的构造函数,都会对容器进行迭代,所以这些间接的迭代操作都可能报出ConcurrentModificationException。

5.2 并发容器

Java5.0提供了多种并发容器类来改进同步容器的性能。同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。代价是来得降低并发性,当多个线程竞争容器的锁时,吞吐量将严重减低。

另一方面,并发容器是针对多个线程并发访问设计的。增加了ConcurrentHashMap用来替代同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作情况下代替同步的List。在新的ConcurrentMap接口中增加了对一些常见复合操作的支持,例如"若没有刚添加"、替换以及有条件删除等。

通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

Java5.0增加了两种新的容器类型:Queue和BlockingQueue。Queue用来保存一组等待处理的元素。

ConcurrentLinkedQueue,先进先出队列。

PriorityQueue,非并发的优先队列。

Queue上的操作不会阻塞,如果队列为空,返回空。然后可以用List来模拟Queue的行为。Queue是用LinkedList来实现Queue,Queue能去掉List的随机访问需求,从而实现更高效的并发。

BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。队列为空,阻塞获取直到出现一个可用元素,如果有界队列满了,那么获取元素的操作将一直阻塞,直到队列中出现可用的空间。在生产者——消费者设计模式中,阻塞队列是非常有用的。

Java6引入了ConcurrentListMap和ConcurrentSkipListSet,分别作为同步的SortedMap和SortedSet的并发替代品。例如用synchronizedMap包装的TreeMap或TreeSet。

5.2.1 ConcurrentHashMap

同步容器类在执行每个操作期间都持有一个锁。HashMap.get或List.contains可能包含大量的工作,而其他的线程在这段时间都不能访问该容器。

ConcurrentHash使用分段锁,任意读取线程可以并发的访问Map,执行读取操作的线程和执行写入操作的线程可以并发地访问Map,并且一定数量的写入线程可以并发地修改Map。ConcurrentHashMap带来的结果是,在并发访问环境下将实现更高的吞吐量,而在单线程环境中只损失小的性能。

尽管有这些改进,但仍然有一些需要权衡的因素。对于一些要在整个Map上进行计算的方法,例如size和isEmpty,这些方法的语义被略微减弱了以反映容器的并发特性,估计值。

ConcurrentHashMap中没有实现对Map加锁以提供独占访问。在Hashtable和synchronizedMap中,获得Map的锁能防止其他线程访问这个Map。

ConcurrentHashMap代替同步Map能进一步提高代码的可伸缩性,只有当应用程序需要加锁Map以进行独占访问时,才应该放弃使用ConcurrentHashMap。

5.2.2 额外的原子Map操作

ConcurrentHashMap不能被加锁 来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。但是一些常见的复合操作,例如若没有则添加,若相等则移除,若相等刚替换等,都已经实现为原子操作并且在ConcurrentMap接口中声明。

5.2.3 CopyOnWriteArrayList

CopyOnWriteArrayList用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。

写入时复制(Copy-On-Write)容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。每次修改时,都会创建并先更新发布一个新的容器副本,从而实现可变性。写入时复制容器的迭代器保留了一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需要确保数组内容的可见性。因此多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器线程相互干扰。写入时复制容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。

仅当迭代操作远远多于修改操作时,才应该候用"写入时复制"容器。这个准则很好地描述了许多事件通知系统:在分发通知时需要迭代已注册监听器链表,并调用每一个监听器,在大多数情况下,注册和注销事件监听器的操作远少于接收事件通知的操作。CPJ 2.4.4。

5.3 阻塞队列和生产者—消费者模式

阻塞队列提供了可阻塞的put和take方法,以及支持实时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法会阻塞到直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满 ,因此put永远不会阻塞。

阻塞队列支持生产者—消费者这种设计模式,该模式把找出需要完成的工作与执行工作这两个过程分离开来,并把工作项放到一个"待完成"列表中以便随后处理,而不是找出后立即处理。它能简化开发过程,因为它消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。

在基于阻塞队列构建的生产者—消费者设计中,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。生产者不需要知道消费者的标识和数量,只需要将数据放队列即可,同样,消费者也不需要知道生着者是谁。BlockingQueue简化了生产者—消费者设计的实现过程,它支持任意数量的生产者和消费者。一种最常见的生产者—消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。

阻塞队列简化了消费者程序的编码,因为take操作会一直阻塞直到有可用的数据。如果生产者不能尽快地产生工作项使消费者保持忙碌,那么消费者就只能一直等待,直到有工作可做。在某些情况下,例如服务器应用程序中,没有任何客户请求服务,而在其他一些情况下,这也表示需要调整生产者线程数量和消费者线程数量之间的比率,从而实现更高的资源利用率。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抵制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

虽然生产者消费者模式能够将生产者和消费者的代码彼此解耦开来,但它们的行为仍然会通过共享工作队列间接地耦合在一起。开发人员应该尽早地通过阻塞队列在设计中构建资源管理机制。如果阻塞队列不能完全符合设计需求,那么还可以通过信号量(Semaphore)来创建其他的阻塞数据结构。

在类库中包含了BlockingQueue的多种实现。

其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,分别与LinkedList和ArrayList的类似,但比同步List有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。正如其它有序容器一样,它可以根据元素的自然顺序来比较元素(如果它们实现了Comparable方法),也可以使用Comparator来比较。

SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。它维护一组线程,这些线程在等待着把元素加入或移出队列。如果以洗盘子为例,就相当于没有盘架,而是将洗好的盘子直接放入下一个空闲的烘干机中。这种实现可以直接交付工作,从而降低了数据从生产者到消费者的延迟。而且还会将更多的任务信息反馈给生产者。交付被接受时,它就知道消费者已经得到了任务,而不是简单地把任务放入了一个队列。因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

5.3.1 示例:桌面搜索

代理程序,它将扫描本地驱动器上的文件并建立索引以便随后进行索引 。

生产者:在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。

消费者:即从队列中取出文件名称并对它们建立索引。

将文件遍历与建立索引等功能分解为独立的操作,比将所有功能都放到一个操作中实现有着更高的代码可读性和可重用性:每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流。

性能优势,生产者和消费者可以并发的执行。如果一个是I/O密集型,一个是CPU密集型,那么并发执行的吞吐率要高于串行执行的吞吐率。

问题:消费者线程永远不会退出。

5.3.2 串行线程封闭

在java.util.concurrent中实现的各种阻塞队列都包含了足够的内部同步机制,从而安全地将对象从生产者线程发布到消费者线程。

对于可变对象,生产者——消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。线程封闭对象只能由单个线程拥有,但可以通过安全地发布对象来转移所有权。在转移所有权后,也只有另一个线程能获得这个对象的访问权限,并且发布对象的线程不会再访问它。这种安全的发布确保了对象状态对于新的所有者来说是可见的,并且由于最初的所有者不会访问它,因此对象将被封闭在新的线程中。新的所有者线程可以对该对象做任意修改,因为这具有独占的访问权。

对象池利用了串行线程封闭,将对象借给一个请求线程。只要对象池包含足够的内部同步来安全地发布池中的对象,并且只要客户代码本身不会发布池中的对象,或者在将对象返回给对象池后就不再使用它,那么就可以安全地在线程之前传递所有权。

可能使用其他发布机制来传递可变对象的所有权,但必须只有一个线程能授受被转移的对象。阻塞队列简化了这项工作。除此这外,还可以通过ConcurrentMap的原子方法remove或者AtomicReference的原子方法compareAndSet来完成这项工作。

5.3.3 双端队列与工作密取

Java 6增加了两种容器类型,Deque(发音为"deck")和BlockingDeque,它们分别对Queue和BlockingQueue进行了扩展。Deque是一个双端队列,实现了队列头和队列尾的高效插入和移除。具体实现包括ArrayDeque和LinkedBlockingDeque。

双端队列与工作密取(Work Stealing)。在生产者消费者模式中,所有的消费者有一个共同的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。

更高的可伸缩性,因为工作者线程不会在单个共享队列上发生竞争。在大多数时候都是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是从头部获取,因此进一步降低了竞争程度。

工作密取非常适用于既是消费者也是生产者的问题—当执行某个工作的时可能导致出再更多的工作。例如,网页抓虫程序中处理一个页面时,通常会发现有更多的页面需要处理,类似的还有许多搜索图的算法,例如在垃圾回收阶段对堆进行标记,都可以通过工作密取机制来实现高效并行。当一个工作线程找到新的任务单元时,它会将其放到自己的队列的末尾(或者在工作共享设计模式中,放入其他工作者线程的队列中)。当双端队列为空时,它会在另一个线程的队列队尾查找新的任务,从而确保每个线程都保持忙碌状态。

5.4 阻塞方法与中断方法

线程可能会被阻塞或者暂停执行,原因有多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或TIME_WAITING)。被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待I/O操作完成,等待某个锁可用,或者等待外部计算的结束。当某个外部事件发生时,线程被置回RUNNABLE状态,并可以再次被调度执行。

BlockingQueue的put和take等方法会抛出受检查异常InterruptedException,这与类库中其他一些方法的做法相同,例如Thread.sleep。当某个方法抛出InterruptedException时,表示该方法是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态。

Thread提供了interrupt方法,用于中断线程或者查询线程是否已被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。

中断是一种协作机制。一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。当线程A中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作——前提是如果线程B愿意停止下来。虽然在API或者语言规范中没有为中断定义任何特定应用级别的语义,但最常使用中断的情况就是取消某个操作。方法对中断请求的响应度越高,就越容易及时取消那些执行时间很长的操作。

当在代码中调用一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。对于库代码来说,有两种选择:

传递InterruptedEception。避开这个异常通常是最明智的策略——只需要把InterruptedEception传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某个简单清理工作后再次抛出这个异常。

恢复中断。有时候不能抛出InterruptedException,例如当代码是Runnable的一部分时。在这些情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。

还可以用一些更复杂的中断处理方法。

然而在出现InterruptedException时不应该做的事情是,捕获它但不做出任何响应。这将使用调用栈上更高层的代码无法对中断采取处理措施,因为线程被中断的证据已经丢失。只有在一种特殊的情况下才能屏蔽中断,即对Thread进行扩展,并且能控制调用栈上所有更高层的代码。

5.5同步工具类

阻塞队列、信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)

所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法高效地等待同步工具类进入到预期状态。

5.5.1 闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。

**闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。**当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。**闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。**例如:

  1. 确保某个计算在其需要的所有资源都被初始化之后才继续执行。
  2. **确保某个服务在其他依赖的所有其他服务都已经启动之后才启动。**每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务都能继续执行。
  3. 等待直到某个操作的所有参与者都就绪再继续执行。

**CountDownLatch可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正整数,表示需要等待的事件数量。**countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞下到计数器为0,或者等待中的线程中断,或者等待超时。

闭锁的两种常用方法。创建一定数量的线程,利用它们并发的执行任务。它使用两个闭锁,分别表示startGate和endGate。起始门计数器的数值为1,而结束门的计数器的初始值为工作线程的数量。每个工作线程首先要在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程最后一件事情是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。

5.5.2 Future Task

FutureTask也可以用做闭锁,Future语义表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且处于以下3种状态:等待运行,正在运行和运行完成。执行完成表示计算的所有可能结束方式包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会永远停止在这个状态上。

Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

FutureTask在Executor框架中表示异步任务,可以用来表示一些时间较长的计算。

5.5.3 信号量

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。某种资源池,或者对容器施加边界。

Semaphore中管理一组许可permit,可以通过构造函数来指定。在执行操作的时候可以首先获得许可,并在使用后释放。如果没有,acquire将阻塞直到有许可(或者中断或者超时),release方法返回信号量。

5.5.4 栅栏

Barrier类型于闭锁,它能阻塞一组线程直到某个事件发生。关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题,当线程到达栅栏位置将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏打开,此时所有线程都被释放,而栅栏将被重置以使用下次使用。

CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时(在一个子任务线程中)执行它,但在阻塞线程释放之前是不执行的。

5.6 构建高效且可伸缩的结果缓存

重用之前的计算结果能降低延迟,提高吞吐量,但却需要消耗更多的内存。

简单的缓存可能会将性能瓶颈转变成可伸缩性瓶颈(线程安全性),即使缓存是用于提升单线程的性能。

  1. 用HashMap,使用synchronize,可伸缩性问题
  2. 用ConcurrentHashMap,会出现重复计算的问题
  3. 用ConcurrentHashMap和FutureTask,更小概率出现计算相同的值
  4. 用ConcurrentMap.putIfAbsent()代替map.put()避免3中的问题

第一部分小结:

可变状态是至关重要的,所有的并发问题都可以归结为如何协调对并发状态的访问。可变状态越少,就越容易确保线程安全性。

尽量将域声明为final类型的,除非需要它们是可变的。

不可变对象一定是线程安全的。

​ 不可变对象能极大地降低并发编程的复杂性。它们更为简单而且安全,可以任意共享而无须使用加锁或保护性复制等机制。

封装有助于管理复杂性:

​ 在编写线程安全的程序时,虽然可以将所有数据保存在全局变量中,但为什么要这样做?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略。

用锁来保护每个可变变量。

当保护同一个不变性条件中的所有变量时,要使用同一个锁。

在执行复合操作期间,要持有锁。

如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。

不要故作聪明地推断出不需要使用同步。

在设计过程中考虑线程安全,或者在文档中明确地指出它不是线程安全的。

将同步策略文档化。


版权声明:本文为myxiaoribeng原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。