目录
CountDownLatch
简介
CountDownLatch是 JDK 提供的并发流程控制的工具类,它是在 java.util.concurrent 包下,在 JDK1.5 以后加入,用来进行同步协作,等待所有线程完成倒计时。其中构造函数用来初始化等待计数值,await()用来等待计数归零,countDown()用来让计数减一。
用法
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
Thread t1 = new Thread(() -> {
System.out.println("begin...");
sleep(1);
countDownLatch.countDown();
System.out.println("end...");
});
Thread t2 = new Thread(()->{
System.out.println("begin...");
sleep(3);
countDownLatch.countDown();
System.out.println("end...");
});
Thread t3 = new Thread(()->{
System.out.println("begin...");
sleep(2);
countDownLatch.countDown();
System.out.println("end...");
});
t1.start();
t2.start();
t3.start();
try {
System.out.println("等待执行...");
countDownLatch.await();
System.out.println("执行结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static void sleep(int i){
try {
Thread.sleep(i*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-- 输出
begin...
begin...
begin...
等待执行...
end...
end...
end...
执行结束
可以看到,await()卡住了主线程,直到三个子线程都执行完了,主线程才开始执行。
原理及源码分析
- new CountDownLatch(3)构造函数中初始化AQS中state = 3
- await()方法尝试获取锁,发现state不为0,获取失败,进入等待队列
- 三个子线程依次执行countDown方法让state--,直到state=0后,执行doReleaseShared方法,unpark之前await的线程
--1.构造方法,初始化state
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
--2.主线程await,发现state不为0,则进入等待队列
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared <0 成立,
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 判断是否可获取锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 进入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
....
if (shouldParkAfterFailedAcquire(p, node) &&
// park住
parkAndCheckInterrupt())
throw new InterruptedException();
}
}
}
--3.countDown执行state--,state为0以后唤醒等待线程
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//3.1
if (tryReleaseShared(arg)) {
//3.2
doReleaseShared();
return true;
}
return false;
}
// 3.1 执行state--,如果state==0返回ture
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
//3.2 唤醒等待线程
private void doReleaseShared() {
for (;;) {
......
// 唤醒等待线程
unparkSuccessor(h);
}
}
CyclicBarrier
简介
CountDownLatch可以解决多个线程同步问题,但计数是一次性的,起不到线程之间同步的效果,JDK提供了CyclicBarrier类,其提供的功能不限于CountDownLatch的功能,可以让一组线程全部达到一个状态后再全部同时执行,并且重复利用。
用法
public static void main(String[] args) {
//初始化CyclicBarrier数值为2
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(()->{
System.out.println(LocalDateTime.now() + " task1 begin...");
sleep(1);
try {
// state--
cyclicBarrier.await();
System.out.println(LocalDateTime.now() +" task1 end");
}catch (Exception e){
e.printStackTrace();
}
});
executorService.submit(()->{
System.out.println(LocalDateTime.now() + " task2 begin...");
sleep(2);
try {
// state--
cyclicBarrier.await();
System.out.println(LocalDateTime.now() + " task2 end");
}catch (Exception e){
e.printStackTrace();
}
});
}
--输出
2021-08-08T21:50:14.350 task1 begin...
2021-08-08T21:50:14.350 task2 begin...
2021-08-08T21:50:16.354 task2 end
2021-08-08T21:50:16.354 task1 end
可以看到,在两个子线程在都执行到await()后,一起往下继续执行。
原理及源码分析
- new CyclicBarrier(2)构造函数初始化数量,使成员变量count=2
- 当前线程执行cyclicBarrier.await(),count计数减一,如果减完count不为0则执行lock锁中的await()进入条件队列,如果等于0则执行signAll(),唤醒所有条件队列中的线程。
-- 1.初始化
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
-- 设置成员变量
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
-- 2.cyclicBarrier.await()
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//计数减一
int index = --count;
if (index == 0) { // tripped
....
// signAll()所有等待线程
nextGeneration();
}
for (;;) {
// count!=0则进行await等待
trip.await();
}
} finally {
lock.unlock();
}
}
可以看到,dowait方法中分别根据不同的条件执行了线程等待和线程全部唤醒操作。
Semaphore
简介
Semaphore信号量也是Java中的一个同步器,用来限制能同时访问共享资源的线程的上限。
用法
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
//获取信号量
semaphore.acquire();
System.out.println(LocalDateTime.now()+"==我是线程:" + Thread.currentThread().getName());
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放信号量
semaphore.release();
}
}).start();
}
}
}
--输出
2021-08-08T22:59:18.166==我是线程:Thread-1
2021-08-08T22:59:18.166==我是线程:Thread-2
2021-08-08T22:59:18.166==我是线程:Thread-0
2021-08-08T22:59:20.171==我是线程:Thread-4
2021-08-08T22:59:20.171==我是线程:Thread-5
2021-08-08T22:59:20.171==我是线程:Thread-3
2021-08-08T22:59:22.175==我是线程:Thread-6
2021-08-08T22:59:22.175==我是线程:Thread-8
2021-08-08T22:59:22.175==我是线程:Thread-7
2021-08-08T22:59:24.180==我是线程:Thread-9
可以看到,一共10个子线程,每次都是三个三个输出,即同一时刻只能有三个线程获取令牌。
原理及源码分析
- 通过构造方法初始化state=3
- 每次有线程acquire,则state-1,如果state不为0则获取锁成功,如果减为0了,则下一个线程进来进入等待队列且park住,
- 最后执行release方法,将state+1 相当于还回令牌,同时唤醒一个等待队列中的元素。
--1.初始化构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
-- 设置state=3
Sync(int permits) {
setState(permits);
}
--2.尝试获取资源
semaphore.acquire();
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 小于0代表获取失败
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
--java.util.concurrent.Semaphore.FairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
// 队列中如果有数据返回-1,表示失败
if (hasQueuedPredecessors())
return -1;
int available = getState();
// 2(剩余资源数) = 3(总资源数) - 1(本线程需要资源数)
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
--3.获取锁情况
--如果获取锁成功则返回
--获取锁失败则执行doAcquireSharedInterruptibly 创建节点并添加到队列
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
--4.释放信号量
semaphore.release();
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
--尝试释放信号量
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//将state+1
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
--如果释放成功执行doReleaseShared,唤醒之前等待队列中的线程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
版权声明:本文为danxiaodeshitou原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。