并发工具类
1 CountDownLatch
该工具是用来同步一个或多个任务,强制他们等待由其他任务执行的一组操作。功能可以类比Thread类的join方法。给该对象设置一个计数值,当一个任务完成时,调用countDown()方法对计数值减一,当计数值为0时,所有任务都完成了。CountDownLatch只触发一次,计数值不能重置。
public static void countDownLatch() throws InterruptedException {
long start = Instant.now().toEpochMilli();
// 设置计数值为3
CountDownLatch count = new CountDownLatch(3);
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 任务执行完毕减一
count.countDown();
}).start();
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 任务执行完毕减一
count.countDown();
}).start();
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 任务执行完毕减一
count.countDown();
}).start();
// 阻塞,当计数值为0时,该行执行完成
count.await();
System.out.println("任务执行完毕......");
System.out.println("耗时: " + (Instant.now().toEpochMilli() - start));
}
运行结果如下:
适用场景: 在多线程执行过程中设置几个门闩,当所有的门闩被打开时,被挡在门外的线程才能继续执行。
在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
用给定的计数,初始化CountDownLatch,由于调用了countDown()方法,所以在当前计数到达0之前,await()方法会一直受阻塞。 之后,会释放所有等待的线程,await的所有后续调用都将立即返回。 这种现象只出现一次----计数无法被重置,如果需要重置计数,建议使用CyclicBarrier.
内部采用共享锁来实现的
与CyclicBarrier的区别
CountDownLatch的作用是允许1个或者多个线程等待其他线程执行完成;而CyclicBarrier则是允许N个线程相互等待
CountDownLatch的计数无法被重置,用完了就没了;但是CyclicBarrier的计数器是可以被重置后继续使用,也因此称之为循环的barrier
2 CyclicBarrier
使用场景:多个线程彼此等待,当所有的线程都到达指定“地点”(指定代码位置),才开始继续执行。
它允许一组线程相互等待,直到到达某个公共屏障点
底层采用:ReenTrantLock+Condition来实现的
通俗的讲:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
例如:一个会议有3个人参加,只有当3个人全部都到会,会议才能够开始。
public static void cyclicBarrier() {
CyclicBarrier count = new CyclicBarrier(3, () -> {
System.out.println("所有人到齐,会议开始...");
});
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(200);
System.out.println("A到达会议");
count.await();
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
try {
Thread.sleep(500);
System.out.println("B到达会议");
count.await();
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
try {
Thread.sleep(900);
System.out.println("C到达会议");
count.await();
} catch (Exception e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
运行结果如下:
3 Semaphore
使用场景:需要控制访问某个资源或者进行某种操作的线程数量。当达到指定数量时,只能等待其他线程释放信号量。
一个控制访问多个共享资源的计数器
从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每个aquire(),然后再获取该许可。 每个release()添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不适用实际的许可对象, Semaphore只对可用许可的号码进行计数,并采用相应的行动。
内部采用共享锁实现的
单个信号量的Semaphore对象可以实现互斥锁的功能
同一时间只允许指定数量的线程数方法,可以控制并发数
public static void semaphore() {
Semaphore semaphore = new Semaphore(3);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
try {
Thread.sleep(100);
// 获取许可
semaphore.acquire();
System.out.println("当前可用信号数 " + semaphore.availablePermits());
System.out.println("等待线程数 " + semaphore.getQueueLength());
Thread.sleep(300);
// 释放许可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
运行结果如下:
4 Exchanger
使用场景:用于两个线程之间交换数据
可以在对其中元素进行配对和交换的线程的同步点
允许在并发任务之间交换数据。具体地说就是Exchanger类允许在两个线程之间定义同步点。 当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入第二个线程中,第二个线程的数据结构进入第一个中
public static void exchanger() {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
String a = "我是线程a的数据";
try {
String b = exchanger.exchange(a);
System.out.println("我是线程a ---> " + b);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
String b = "我是线程b的数据";
try {
String a = exchanger.exchange(b);
System.out.println("我是线程b ---> " + a);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
运行结果如下: