Java多线程通信-CyclicBarrier(栅栏)

一、CyclicBarrier(栅栏)

    通过闭锁,我们可以启动一组相关的操作、或者等待一组相关的操作结束。闭锁是一次性对象,到达终止状态后将不可用。
    CyclicBarrier与闭锁类似,能够延迟一组线程的进度直到到达某个状态。栅栏与闭锁的关键区别在于:所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏等待其它线程。
    线程在到达栅栏位置时,将调用await方法,这个方法会阻塞直到所有线程都到达栅栏位置。当所有线程都到达栅栏位置时,栅栏被认为打开了,此时所有线程都将被释放,栅栏也将被重置以便下次使用,并且await方法会为每个通过的线程返回一个唯一的索引号;如果一个线程在等待栅栏(调用await方法)时超时或者被中断,就认为栅栏被打破了,此时所有在栅栏上等待的线程都将抛出BrokenBarrierException异常。并且栅栏如果是被打破的状态,调用await方法将不阻塞,而是直接抛出BrokenBarrierException。

    CycllicBarrier还可以通过构造函数中传入Runaable barrierAction携带一个任务。当最后一个线程达到栅栏位置,栅栏打开,会首先执行完这个barrierAction,再释放被阻塞的线程。如果barrierAction抛出异常,栅栏也会被打破,所有在栅栏上等待的线程都会抛出BrokenBarrierException异常。所以这个barrierAction需要慎用

CyclicBarrier主要方法:

方法说明
CyclicBarrier(int parties)构造一个新的栅栏,parties为在栅栏上等待的线程数量
CyclicBarrier(int parties, Runnable barrierAction)构造一个新栅栏,parties为在栅栏上等待的线程数量,barrierAction为栅栏打开时优先执行的任务
int await() throws InterruptedException, BrokenBarrierException阻塞等待直到有足够数量的线程到达栅栏位置。
int await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException
限时阻塞,如果当时间到达栅栏还没有打开,则抛出Timeout异常,并且栅栏将被被打破,被阻塞的其它线程都会抛出BrokenBarrierException异常
void reset()重置栅栏,如果存在正在等待的线程,这些线程将会抛出BrokenBarrierException。在重置之后可以正常调用await方法
int getNumberWaiting()返回在栅栏上等待的线程数量
boolean isBroken()栅栏是否处于打破状态

二、代码示例:

1. 常规使用

static class CyclicBarrierThread extends Thread{
        final CyclicBarrier cyclicBarrier ;

        CyclicBarrierThread(String name,CyclicBarrier cyclicBarrier) {
            super.setName(name);
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(super.getName()+" cyclicBarrier.getNumberWaiting(): "+ cyclicBarrier.getNumberWaiting());
                cyclicBarrier.await();
                System.out.println(super.getName()+" cyclicBarrier.await finish");
                ThreadUtil.sleep(3000);
            }  catch (InterruptedException e) {
                System.out.println(super.getName()+" interrupt flag : "+Thread.currentThread().isInterrupted());
            }catch (Exception e) {
                System.out.println(super.getName()+" throw a exception: " +e.getClass());
                System.out.println(super.getName()+" exception: "+ cyclicBarrier.getNumberWaiting());
            } finally {
                System.out.println(super.getName()+" finally numberWaiting: "+ cyclicBarrier.getNumberWaiting());
            }
        }

    }

// 创建6个线程,栅栏初始化数量为4。
 public static void testCyclicBarrier(){
	 CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
	  List<Thread> list = new ArrayList<>();
        for(int i=0;i<6;i++) {
            CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
            cyclicBarrierThread.start();
            list.add(cyclicBarrierThread);
            ThreadUtil.sleep(2000);
        }
}

 public static void main(String[] args) throws Exception {
        testCyclicBarrier();
    }
    

结果在这里插入图片描述

一开始thread0-2这3个线程会在await方法处阻塞。当第4个线程调用await方法到达栅栏处时,栅栏被打开。thread0-3这4个线程通过,并且栅栏被重置。接下来的thread4和thread5将一直阻塞。

2. await对中断的反应

其余代码不变,在testCyclicBarrier 方法中添加中断

public static void testCyclicBarrier(){
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

        List<Thread> list = new ArrayList<>();
        for(int i=0;i<6;i++) {
            CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
            cyclicBarrierThread.start();
            list.add(cyclicBarrierThread);
            ThreadUtil.sleep(2000);
        }
        ThreadUtil.sleep(1000);
        System.out.println("===========================");
		// 中断thread4,因为这个时候thread0-3已经执行完毕了。
        list.get(4).interrupt();
        ThreadUtil.sleep(500);
        System.out.println("cyclicBarrier.isBroken(): "+cyclicBarrier.isBroken());
   }

结果
在这里插入图片描述

    thread4被中断时,在栅栏上等待的线程只有2个: thread4和thread5。这里thread4抛出了中断异常,并且中断状态被清除:InterruptedException, thread5抛出BrokenBarrierException异常。并且isBroken也返回true,说明栅栏确实被打破了,打断之后在栅栏上等待的所有线程抛出BrokenBarrierException异常。
    如果这时候再有线程调用await方法呢,会抛出异常,因为栅栏已经被打破,这种情况下调用await,会直接抛出BrokenBarrierException异常。
在testCyclicBarrier方法体中添加代码,再次调用await方法

System.out.println("cyclicBarrier.isBroken(): "+cyclicBarrier.isBroken());
CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + 6, cyclicBarrier);
        cyclicBarrierThread.start();

在这里插入图片描述

3. reset重置栅栏

    reset方法会重置栅栏,如果存在正在等待的线程,这些线程将会抛出BrokenBarrierException。如果在栅栏被打破之后,调用reset()方法,则栅栏被重置,又可以被正常使用了。

public static void testCyclicBarrier(){
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

        List<Thread> list = new ArrayList<>();
        for(int i=0;i<6;i++) {
            CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
            cyclicBarrierThread.start();
            list.add(cyclicBarrierThread);
            ThreadUtil.sleep(2000);
        }
        ThreadUtil.sleep(1000);
        System.out.println("===========================");
		
        list.get(4).interrupt();
        ThreadUtil.sleep(500);
        System.out.println("cyclicBarrier.isBroken(): "+cyclicBarrier.isBroken());
        // 重置栅栏
        cyclicBarrier.reset();
        System.out.println("cyclicBarrier after reset isBroken: "+cyclicBarrier.isBroken());
        CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + 7, cyclicBarrier);
        cyclicBarrierThread.start();
}

3.Runaable barrierAction

当构造栅栏时,可以传入一个优先任务,当最后一个线程调用await()方法,栅栏即将被打开时,这最后一个线程会优先执行barrierAction任务。执行完之后,唤醒所有再栅栏上等待的其它线程。如果barrierAction抛出异常,栅栏被打破,所有等待线程将抛出异常BrokenBarrierException

public static void testCyclicBarrier(){
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " barrierAction execute");
                ThreadUtil.sleep(6000);
                System.out.println("barrierAction execute finish");
            }
        });

        List<Thread> list = new ArrayList<>();
        for(int i=0;i<6;i++) {
            CyclicBarrierThread cyclicBarrierThread = new CyclicBarrierThread("thread" + i, cyclicBarrier);
            cyclicBarrierThread.start();
            list.add(cyclicBarrierThread);
            ThreadUtil.sleep(2000);
        }
 }

结果
在这里插入图片描述
从输出可以看出,执行barrierAction任务的线程为最后到达栅栏的线程,并且在barrierAction执行完毕之后,栅栏打开,所有线程通过。
如果barrierAction中,抛出了异常呢:

CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " barrierAction execute");
                ThreadUtil.sleep(6000);
                int s = 1/0;
            }
        });

在这里插入图片描述
我们看到,thread3抛出了ArithmeticException异常,这是1/0抛出的,其余等待的线程均抛出BrokenBarrierException异常,并且这时候栅栏被打破了,isBroken()将返回true


版权声明:本文为liu_shi_jun原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。