5 多线程锁
5.1锁的八个问题演示
class Phone {
public static synchronized void sendSMS() throws Exception {
//停留4秒
TimeUnit.SECONDS.sleep(4);
System.out.println("------sendSMS");
}
public synchronized void sendEmail() throws Exception {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("------getHello");
}
}
1 标准访问,先打印短信还是邮件
------sendSMS
------sendEmail
2 停4秒在短信方法内,先打印短信还是邮件
------sendSMS
------sendEmail
3 新增普通的hello方法,是先打短信还是hello
------getHello
------sendSMS
4 现在有两部手机,先打印短信还是邮件
------sendEmail
------sendSMS
5 两个静态同步方法,1部手机,先打印短信还是邮件
------sendSMS
------sendEmail
6 两个静态同步方法,2部手机,先打印短信还是邮件
------sendSMS
------sendEmail
7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
------sendEmail
------sendSMS
8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
------sendEmail
------sendSMS
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是Synchonized括号里配置的对象
5.2公平锁和非公平锁
ReentrantLock构造方法根据传入参数创建公平锁或非公平锁,默认为非公平锁。非公平锁可能会有线程饿死,效率高;公平锁阳光普照,效率相对低。
NonfairSync源码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
FairSync源码:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
5.3可重入锁(递归锁)
synchronized和lock都是可重入锁
5.4死锁
查看进程jps、jstack
deadlock测试代码:
public static void main(String[] args) {
Ticket t1 = new Ticket();
Ticket t2 = new Ticket();
new Thread(() -> {
synchronized (t1){
System.out.println("拿到锁t1,准备获取锁t2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (t2){
System.out.println("获取锁t2");
}
}
}, "aa").start();
new Thread(() -> {
synchronized (t2){
System.out.println("拿到锁t2,准备获取锁t1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (t1){
System.out.println("获取锁t1");
}
}
}, "bb").start();
}
6、callable接口
使用callable接口创建线程
public static void main(String[] args) throws Exception {
FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
System.out.println(Thread.currentThread().getName() + "==callable线程执行。。。");
return 1111;
});
new Thread(futureTask, "AA").start();
System.out.println(futureTask.get());
}
7、JUC 三大辅助类
7.1CountDownLatch
CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。
- CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞
- 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
- 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
测试代码:
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "同学离开了教室。");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门了。");
}
7.2循环栅栏CyclicBarrier
CyclicBarrier看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句。可以将CyclicBarrier理解为加1操作
测试代码:
private final static int num = 7;
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
System.out.println("七颗龙珠已收集,可以召唤神龙");
});
for (int i = 1; i <= num; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "颗龙珠已收集");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
7.3信号量Semaphore
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到了停车位");
TimeUnit.SECONDS.sleep(new Random().nextInt(6));
System.out.println(Thread.currentThread().getName() + "离开了停车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
8、读写锁
8.1读写锁介绍
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。
- 线程进入读锁的前提条件:
• 没有其他线程的写锁
• 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。 - 线程进入写锁的前提条件:
• 没有其他线程的读锁
• 没有其他线程的写锁
而读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
8.2读写锁案例实现
资源类
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author lyz
* @Title: CustomCache
* @Description:
* @date 2021/10/8 15:03
*/
public class CustomCache {
private volatile Map<String, String> cacheMap = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, String value){
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在写入数据。。" + key);
TimeUnit.SECONDS.sleep(1);
cacheMap.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完了" + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public String get(String key){
rwLock.readLock().lock();
String value = null;
try {
System.out.println(Thread.currentThread().getName() + "正在读数据。。" + key);
TimeUnit.SECONDS.sleep(1);
value = cacheMap.get(key);
System.out.println(Thread.currentThread().getName() + "读完了key=" + key + ",value=" + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
return value;
}
}
测试代码
public static void main(String[] args) {
CustomCache cache = new CustomCache();
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
cache.put(String.valueOf(num), String.valueOf(num));
}, String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
cache.get(String.valueOf(num));
}, String.valueOf(i)).start();
}
}
8.3锁降级演示demo
public static void main(String[] args) {
ReadWriteLock rwLock = new ReentrantReadWriteLock();
rwLock.writeLock().lock();
System.out.println("写入数据");
rwLock.readLock().lock();
System.out.println("读取数据");
rwLock.readLock().unlock();
rwLock.writeLock().unlock();
}
9、阻塞队列
常用的队列主要有以下两种:
• 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
• 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
9.1BlockingQueue核心方法
drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
9.2测试代码
public static void main(String[] args) throws Exception {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//第一组 抛出异常
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
System.out.println("======================================================");
//第二组 特殊值
System.out.println(blockingQueue.offer("1"));
System.out.println(blockingQueue.offer("2"));
System.out.println(blockingQueue.offer("3"));
System.out.println(blockingQueue.offer("4"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println("======================================================");
//第三组 阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
System.out.println("======================================================");
//第四组 超时
System.out.println(blockingQueue.offer("a", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS));
}
10、线程池
10.1线程池简介
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
特点:
• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
• Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
10.2测试代码
public static void main(String[] args) {
//一池多线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
//一池一线程
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
//可扩展线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务。");
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
看源码可知底层使用的都是ThreadPoolExecutor
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
10.3常用参数
• corePoolSize线程池的核心线程数
• maximumPoolSize能容纳的最大线程数
• keepAliveTime空闲线程存活时间
• unit 存活的时间单位
• workQueue 存放提交但未执行任务的队列
• threadFactory 创建线程的工厂类
• handler 等待队列满后的拒绝策略
10.4线程池底层工作原理
1. 在创建了线程池后,线程池中的线程数为零
2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
10.4.1 JDK内置的拒绝策略
CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy: 直接丢弃,其他啥都没有
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
10.5 ThreadPoolExecutor手动创建
实际开发中创建线程池推荐使用ThreadPoolExecutor及其7个参数手动创建
测试代码
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 2l, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务。");
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
11、Fork/Join
Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情: Fork:把一个复杂任务进行分拆,大事化小 Join:把分拆任务的结果进行合并
- 任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
- 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
在Java
定义分支合并类
import java.util.concurrent.RecursiveTask;
/**
* @author lyz
* @Title: MyTask
* @Description:
* @date 2021/10/13 17:18
*/
public class MyTask extends RecursiveTask<Integer> {
private static final int value = 10;
private int max;
private int min;
private int result;
public MyTask(int max, int min) {
this.max = max;
this.min = min;
}
@Override
protected Integer compute() {
if (max - min < value){
for (int i = min; i <= max; i++) {
result += i;
}
} else {
int mid = (max + min) / 2;
MyTask task1 = new MyTask(mid, min);
MyTask task2 = new MyTask(max, mid + 1);
task1.fork();
task2.fork();
result = task1.join() + task2.join();
}
return result;
}
}
调用分支合并代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(100, 1);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
Integer result = submit.get();
System.out.println(result);
forkJoinPool.shutdown();
}
12、CompletableFuture
12.1 CompletableFuture简介
CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。
12.2 Future与CompletableFuture
Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future的主要缺点如下:
(1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2)不支持进一步的非阻塞调用
通过Future的get方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能
(3)不支持链式调用
对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。
(4)不支持多个Future合并
比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。
(5)不支持异常处理
Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。
12.3测试代码
public static void main(String[] args) throws ExecutionException, InterruptedException {
//异步调用无返回值
CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "正在执行");
}).get();
//异步调用有返回值
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "正在执行");
return 100;
});
supplyAsync.whenComplete((t, e) -> {
//t 返回值 e 异常
System.out.println("---------t" + t);
System.out.println("---------e" + e);
}).get();
}