CountDownLatch、CyclicBarrier、Semaphore详解

目录

CountDownLatch

简介

用法

原理及源码分析

CyclicBarrier

简介

用法

原理及源码分析

Semaphore

简介

用法

原理及源码分析


CountDownLatch

简介

 CountDownLatch是 JDK 提供的并发流程控制的工具类,它是在 java.util.concurrent 包下,在 JDK1.5 以后加入,用来进行同步协作,等待所有线程完成倒计时。其中构造函数用来初始化等待计数值,await()用来等待计数归零,countDown()用来让计数减一。

用法

public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Thread t1 = new Thread(() -> {
            System.out.println("begin...");
            sleep(1);
            countDownLatch.countDown();
            System.out.println("end...");
        });
        Thread t2 = new Thread(()->{
            System.out.println("begin...");
            sleep(3);
            countDownLatch.countDown();
            System.out.println("end...");
        });
        Thread t3 = new Thread(()->{
            System.out.println("begin...");
            sleep(2);
            countDownLatch.countDown();
            System.out.println("end...");
        });

        t1.start();
        t2.start();
        t3.start();

        try {
            System.out.println("等待执行...");
            countDownLatch.await();
            System.out.println("执行结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    static void sleep(int i){
        try {
            Thread.sleep(i*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


-- 输出
begin...
begin...
begin...
等待执行...
end...
end...
end...
执行结束

可以看到,await()卡住了主线程,直到三个子线程都执行完了,主线程才开始执行。

原理及源码分析

  • new CountDownLatch(3)构造函数中初始化AQS中state = 3
  • await()方法尝试获取锁,发现state不为0,获取失败,进入等待队列
  • 三个子线程依次执行countDown方法让state--,直到state=0后,执行doReleaseShared方法,unpark之前await的线程
--1.构造方法,初始化state
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

Sync(int count) {
     setState(count);
}

--2.主线程await,发现state不为0,则进入等待队列
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // tryAcquireShared <0 成立,
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

// 判断是否可获取锁
protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
}


private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 进入等待队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                ....
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // park住
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } 
    }

--3.countDown执行state--,state为0以后唤醒等待线程
public void countDown() {
        sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
       //3.1 
       if (tryReleaseShared(arg)) {
            //3.2
            doReleaseShared();
            return true;
        }
        return false;
    }

// 3.1 执行state--,如果state==0返回ture
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
//3.2 唤醒等待线程
private void doReleaseShared() {
        for (;;) {
          ......
          // 唤醒等待线程  
          unparkSuccessor(h);
       }
               
    }

CyclicBarrier

简介

CountDownLatch可以解决多个线程同步问题,但计数是一次性的,起不到线程之间同步的效果,JDK提供了CyclicBarrier类,其提供的功能不限于CountDownLatch的功能,可以让一组线程全部达到一个状态后再全部同时执行,并且重复利用。

用法

public static void main(String[] args) {

        //初始化CyclicBarrier数值为2
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.submit(()->{
            System.out.println(LocalDateTime.now() + " task1 begin...");
            sleep(1);
            try {
                // state--
                cyclicBarrier.await();
                System.out.println(LocalDateTime.now() +" task1 end");
            }catch (Exception e){
                e.printStackTrace();
            }
        });

        executorService.submit(()->{
            System.out.println(LocalDateTime.now() + " task2 begin...");
            sleep(2);
            try {
                // state--
                cyclicBarrier.await();
                System.out.println(LocalDateTime.now() + " task2 end");
            }catch (Exception e){
                e.printStackTrace();
            }
        });
    }


--输出
2021-08-08T21:50:14.350 task1 begin...
2021-08-08T21:50:14.350 task2 begin...
2021-08-08T21:50:16.354 task2 end
2021-08-08T21:50:16.354 task1 end

可以看到,在两个子线程在都执行到await()后,一起往下继续执行。

原理及源码分析

  • new CyclicBarrier(2)构造函数初始化数量,使成员变量count=2
  • 当前线程执行cyclicBarrier.await(),count计数减一,如果减完count不为0则执行lock锁中的await()进入条件队列,如果等于0则执行signAll(),唤醒所有条件队列中的线程。
-- 1.初始化
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

-- 设置成员变量
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
}

-- 2.cyclicBarrier.await()
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //计数减一
            int index = --count;
            if (index == 0) {  // tripped
              ....
              // signAll()所有等待线程  
              nextGeneration();
            }

            for (;;) {
                // count!=0则进行await等待
                trip.await();
              
            }
        } finally {
            lock.unlock();
        }
    }

可以看到,dowait方法中分别根据不同的条件执行了线程等待和线程全部唤醒操作。

Semaphore

简介

Semaphore信号量也是Java中的一个同步器,用来限制能同时访问共享资源的线程的上限。

用法

public class SemaphoreTest {
        public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    //获取信号量
                    semaphore.acquire();
                    System.out.println(LocalDateTime.now()+"==我是线程:" + Thread.currentThread().getName());
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 释放信号量
                    semaphore.release();
                }
            }).start();
        }
    }
}

--输出
2021-08-08T22:59:18.166==我是线程:Thread-1
2021-08-08T22:59:18.166==我是线程:Thread-2
2021-08-08T22:59:18.166==我是线程:Thread-0
2021-08-08T22:59:20.171==我是线程:Thread-4
2021-08-08T22:59:20.171==我是线程:Thread-5
2021-08-08T22:59:20.171==我是线程:Thread-3
2021-08-08T22:59:22.175==我是线程:Thread-6
2021-08-08T22:59:22.175==我是线程:Thread-8
2021-08-08T22:59:22.175==我是线程:Thread-7
2021-08-08T22:59:24.180==我是线程:Thread-9

可以看到,一共10个子线程,每次都是三个三个输出,即同一时刻只能有三个线程获取令牌。

原理及源码分析

  • 通过构造方法初始化state=3
  • 每次有线程acquire,则state-1,如果state不为0则获取锁成功,如果减为0了,则下一个线程进来进入等待队列且park住,
  • 最后执行release方法,将state+1 相当于还回令牌,同时唤醒一个等待队列中的元素。
--1.初始化构造方法
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
-- 设置state=3
Sync(int permits) {
     setState(permits);
}

--2.尝试获取资源
semaphore.acquire();

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 小于0代表获取失败
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

--java.util.concurrent.Semaphore.FairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 队列中如果有数据返回-1,表示失败
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                // 2(剩余资源数) = 3(总资源数) - 1(本线程需要资源数)
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
}

--3.获取锁情况
--如果获取锁成功则返回
--获取锁失败则执行doAcquireSharedInterruptibly 创建节点并添加到队列
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
--4.释放信号量
semaphore.release();
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

--尝试释放信号量
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //将state+1
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
--如果释放成功执行doReleaseShared,唤醒之前等待队列中的线程
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
}



版权声明:本文为danxiaodeshitou原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。