一、CyclicBarrier(栅栏)
通过闭锁,我们可以启动一组相关的操作、或者等待一组相关的操作结束。闭锁是一次性对象,到达终止状态后将不可用。
CyclicBarrier与闭锁类似,能够延迟一组线程的进度直到到达某个状态。栅栏与闭锁的关键区别在于:所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏等待其它线程。
线程在到达栅栏位置时,将调用await方法,这个方法会阻塞直到所有线程都到达栅栏位置。当所有线程都到达栅栏位置时,栅栏被认为打开了,此时所有线程都将被释放,栅栏也将被重置以便下次使用,并且await方法会为每个通过的线程返回一个唯一的索引号;如果一个线程在等待栅栏(调用await方法)时超时或者被中断,就认为栅栏被打破了,此时所有在栅栏上等待的线程都将抛出BrokenBarrierException异常。并且栅栏如果是被打破的状态,调用await方法将不阻塞,而是直接抛出BrokenBarrierException。
CycllicBarrier还可以通过构造函数中传入Runaable barrierAction携带一个任务。当最后一个线程达到栅栏位置,栅栏打开,会首先执行完这个barrierAction,再释放被阻塞的线程。如果barrierAction抛出异常,栅栏也会被打破,所有在栅栏上等待的线程都会抛出BrokenBarrierException异常。所以这个barrierAction需要慎用
CyclicBarrier主要方法:
方法 | 说明 |
---|---|
CyclicBarrier(int parties) | 构造一个新的栅栏,parties为在栅栏上等待的线程数量 |
CyclicBarrier(int parties, Runnable barrierAction) | 构造一个新栅栏,parties为在栅栏上等待的线程数量,barrierAction为栅栏打开时优先执行的任务 |
int await() throws InterruptedException, BrokenBarrierException | 阻塞等待直到有足够数量的线程到达栅栏位置。 |
int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException | 限时阻塞,如果当时间到达栅栏还没有打开,则抛出Timeout异常,并且栅栏将被被打破,被阻塞的其它线程都会抛出BrokenBarrierException异常 |
void reset() | 重置栅栏,如果存在正在等待的线程,这些线程将会抛出BrokenBarrierException。在重置之后可以正常调用await方法 |
int getNumberWaiting() | 返回在栅栏上等待的线程数量 |
boolean isBroken() | 栅栏是否处于打破状态 |
二、代码示例:
1. 常规使用
static class CyclicBarrierThread extends Thread{
final CyclicBarrier cyclicBarrier ;
CyclicBarrierThread(String name,CyclicBarrier cyclicBarrier) {
super.setName(name);
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(super.getName()+" cyclicBarrier.getNumberWaiting(): "+ cyclicBarrier.getNumberWaiting());
cyclicBarrier.await();
System.out.println(super.getName()+" cyclicBarrier.await finish");
ThreadUtil.sleep(3000);
} catch (InterruptedException e) {
System.out.println(super.getName()+" interrupt flag : "+Thread.currentThread().isInterrupted());
}catch (Exception e) {
System.out.println(super.getName()+" throw a exception: " +e.getClass());
System.out.println(super.getName()+" exception: "+ cyclicBarrier.getNumberWaiting());
} finally {
System.out.println(super.getName()+" finally numberWaiting: "+ cyclicBarrier.getNumberWaiting());
}
}
}
// 创建6个线程,栅栏初始化数量为4。
public static void testCyclicBarrier(){
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
List<Thread> list = new ArrayList<>();
for(int i=0;i<6;i++) {
CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
cyclicBarrierThread.start();
list.add(cyclicBarrierThread);
ThreadUtil.sleep(2000);
}
}
public static void main(String[] args) throws Exception {
testCyclicBarrier();
}
结果
一开始thread0-2这3个线程会在await方法处阻塞。当第4个线程调用await方法到达栅栏处时,栅栏被打开。thread0-3这4个线程通过,并且栅栏被重置。接下来的thread4和thread5将一直阻塞。
2. await对中断的反应
其余代码不变,在testCyclicBarrier 方法中添加中断
public static void testCyclicBarrier(){
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
List<Thread> list = new ArrayList<>();
for(int i=0;i<6;i++) {
CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
cyclicBarrierThread.start();
list.add(cyclicBarrierThread);
ThreadUtil.sleep(2000);
}
ThreadUtil.sleep(1000);
System.out.println("===========================");
// 中断thread4,因为这个时候thread0-3已经执行完毕了。
list.get(4).interrupt();
ThreadUtil.sleep(500);
System.out.println("cyclicBarrier.isBroken(): "+cyclicBarrier.isBroken());
}
结果
thread4被中断时,在栅栏上等待的线程只有2个: thread4和thread5。这里thread4抛出了中断异常,并且中断状态被清除:InterruptedException, thread5抛出BrokenBarrierException异常。并且isBroken也返回true,说明栅栏确实被打破了,打断之后在栅栏上等待的所有线程抛出BrokenBarrierException异常。
如果这时候再有线程调用await方法呢,会抛出异常,因为栅栏已经被打破,这种情况下调用await,会直接抛出BrokenBarrierException异常。
在testCyclicBarrier方法体中添加代码,再次调用await方法
System.out.println("cyclicBarrier.isBroken(): "+cyclicBarrier.isBroken());
CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + 6, cyclicBarrier);
cyclicBarrierThread.start();
3. reset重置栅栏
reset方法会重置栅栏,如果存在正在等待的线程,这些线程将会抛出BrokenBarrierException。如果在栅栏被打破之后,调用reset()方法,则栅栏被重置,又可以被正常使用了。
public static void testCyclicBarrier(){
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
List<Thread> list = new ArrayList<>();
for(int i=0;i<6;i++) {
CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
cyclicBarrierThread.start();
list.add(cyclicBarrierThread);
ThreadUtil.sleep(2000);
}
ThreadUtil.sleep(1000);
System.out.println("===========================");
list.get(4).interrupt();
ThreadUtil.sleep(500);
System.out.println("cyclicBarrier.isBroken(): "+cyclicBarrier.isBroken());
// 重置栅栏
cyclicBarrier.reset();
System.out.println("cyclicBarrier after reset isBroken: "+cyclicBarrier.isBroken());
CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + 7, cyclicBarrier);
cyclicBarrierThread.start();
}
3.Runaable barrierAction
当构造栅栏时,可以传入一个优先任务,当最后一个线程调用await()方法,栅栏即将被打开时,这最后一个线程会优先执行barrierAction任务。执行完之后,唤醒所有再栅栏上等待的其它线程。如果barrierAction抛出异常,栅栏被打破,所有等待线程将抛出异常BrokenBarrierException
public static void testCyclicBarrier(){
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " barrierAction execute");
ThreadUtil.sleep(6000);
System.out.println("barrierAction execute finish");
}
});
List<Thread> list = new ArrayList<>();
for(int i=0;i<6;i++) {
CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
cyclicBarrierThread.start();
list.add(cyclicBarrierThread);
ThreadUtil.sleep(2000);
}
}
结果
从输出可以看出,执行barrierAction任务的线程为最后到达栅栏的线程,并且在barrierAction执行完毕之后,栅栏打开,所有线程通过。
如果barrierAction中,抛出了异常呢:
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " barrierAction execute");
ThreadUtil.sleep(6000);
int s = 1/0;
}
});
我们看到,thread3抛出了ArithmeticException异常,这是1/0抛出的,其余等待的线程均抛出BrokenBarrierException异常,并且这时候栅栏被打破了,isBroken()将返回true