异步线程池 & 异步编排
线程回顾
初始化线程的四种方式
继承Thread
public class ThreadTest{ public static class Thread01 extends Thread { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果: " + i); } } public static void main(String[] args){ Thread01 thread01 = new Thread01(); thread01.start(); // 启动线程 } }实现Runnable接口
public class ThreadTest{ public static class Runnable01 implements Runnable { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果: " + i); } } public static void main(String[] args){ Runnable01 runnable01 = new Runnable01(); new Thread(runnable01).start(); } }实现Callable接口+ FutureTask ( 可以拿到返回结果,可以处理异常)
public class ThreadTest{ public static class Callable01 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果: " + i); return i; } } public static void main(String[] args){ // 使用Future执行Callable FutureTask<Integer> futureTask = new FutureTask<>(new Callable01()); new Thread(futureTask).start(); // FutureTask 是Runnable的实现类 // get() 堵塞等待整个线程执行完成,获取返回结果 Integer integer = futureTask.get(); System.out.println("integer = " + integer); } }线程池
public class ThreadTest{ // 最原始的方式 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 200, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // 包装之后简化线程池的创建(jdk8 的特性) public static ExecutorService executor = Executors.newFixedThreadPool(10); }
总结:
方式1和方式2:主进程无法获取线程的运算结果。不适合当前场景
方式3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。
方式4:通过如下两种方式初始化线程池
Executors.newFiexedThreadPool(3); //或者 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit,workQueue, threadFactory, handler);通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。
线程池的七大参数
七大参数
- int corePoolSize:[5] 核心线程数[一直存在除非设置(allowCoreThreadTimeOut)];线程池创建好以后就准备5个 Thread thread = new Thread(); thread.start();
- int maximumPoolSize:[200] 最大线程数量;控制资源并发
- long keepAliveTime 存活时间。如果当前的线程数量大于core数量。就是释放空闲的线程(maximumPoolSize - corePoolSize)。只要线程空闲大于指定的keepAliveTime(就是刚做完任务的线程,然后也没新任务,他能存活多久)
- TimeUnit unit, 时间单位
- BlockingQueue workQueue: 堵塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要有线程空闲了,就会去队列里面取出新的任务继续干活
- ThreadFactory threadFactory:线程的创建工厂
- RejectedExecutionHandler handler : 如果队列满了,按照我们指定的拒绝策略拒绝执行任务
工作顺序
- 线程池创建,准备好core数量的核心线程,准备接受任务
- core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务执行
- 阻塞队列满了,就直接开新线程执行,最大只能开到max指定的数量
- max满了就用RejectedExecut ionHandler拒绝任务
- ax都执行完成,有很多空闲.在指定的时间keepAliveTime以后,释放max一core这 些线程new LinkedBlockingDeque<>(): 默认是Integer的最大值。这会导致我们内存不够,所以必须指定数量
面试题: 一个线程池core 7,max 20 ,queue 50 100并发进来怎么分配的
7个会立即执行,50个会进入队列,再开13个进行执行。剩下的30个就使用拒绝策略(默认是丢弃)。如果不想丢弃我们可以在创建线程池的时候执行拒绝策略 CallerRunsPolicy
常见的4中线程池
public class ThreadTest{
Executors.newCachedThreadPool(); // core 是0,所有都可以回收,来一个任务就创建一个线程
Executors.newFixedThreadPool(10); // 固定线程数量,core = max = 10 就是都不可回收
Executors.newScheduledThreadPool(10); // core=10,定时任务的线程池,从队列里面获取任务可以设置延迟时间。
Executors.newSingleThreadExecutor(); // 单线程的线程池,堵塞的任务都放入队列中,挨个执行
}
开发中为什么使用线程池
- 资源控制。设想场景一万个并发过来你直接new Thread(()->System.out.println(“hello…thread”)).start() 瞬间就oom 了
- 降低资源的消耗
- 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
- 提高响应速度
- 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
- 提高线程的可管理性
- 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销(cpu保护现场恢复现场太多次)。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
CompletableFuture 异步编排
业务场景:查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。
异步编排的好处:假如商品详情页的每个查询,需要如下标注的时间才能完成。那么,用户需要6.5s后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这6步操作,也许只需要1.5s 即可完成响应。
创建异步对象
CompletableFuture提供了四个静态方法来创建一个异步操作。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
1、runXxx都是没有返回结果的, supplyXx都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;
计算完成时回调方法
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
- whenComplete可以处理正常和异常的计算结果,虽然能得到异常信息,但是没法修改返回的数据。
- exceptionally 处理异常情祝,感知异常,同时返回默认值。
- whenComplete和whenCompleteAsync 的区别:
- whenComplete:是执行当前任务的线程执行继续执行whenComplete 的任务(使用的是同一线程)。
- whenCompleteAsync:是执行把whenCompleteAsync 这个任务继续提交给线程池来进行执行(另一个线程)。
- 方法不以Async结尾,意味着Action使用相同的线程执行,而Asyne可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
handle方法
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
- 和complete一样,可对结果做最后的处理(可处理异常),可改变返回值。
线程串行化方法
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
thenApply方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作
* 1) thenRunAsync: 不能获取到上一步的结果,无返回值 * 2) thenAcceptAsync: 能获取当上一步的结果,无返回值 * 3) thenApplyAsync : 能获取到上一步的结果,有返回值
总结:
- 带有Async默认是异步执行的。同之前。
- 以上都要前置任务成功完成。所以叫线程串行化。
两个任务组合-都要完成
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
两个任务必须都完成,触发该任务。
thenCombine: 组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth: 组合两个future,不需要获取future 的结果,只需两个future处理完任务后,处理该任务。
* runAfterBothAsync 拿不到两个future 的结果,无返回值
* thenAcceptBothAsync 能拿到两个future 的结果,无返回值
* thenCombineAsync 能拿到两个future 的结果,有返回值
两个任务组合-一个完成
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
当两个任务中,任意一个future任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
runAfterEither:两个任务有一一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
两个任务只要有一个完成,我们就执行任务3
* runAfterEitherAsync 不能感知结果,没有返回值
* acceptEitherAsync 能感知结果,没有返回值(注册的future 返回值类型必须一致)
* applyToEitherAsync 能感知结果,有返回值(注册的future 返回值类型必须一致)
多任务组合
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
allOf.get(); // 等待所有的结果完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc, futureNone);
System.out.println("最快完成的任务->" + anyOf.get()); // 这个就是第一个完成的future 的返回值,只要有一个完成就不堵塞
allOf:等待所有任务完成
anyOf:只要有一个任务完成
好处
线程池和异步编排能提供我们服务器的性能和吞吐量。线程池是性能,异步编排是吞吐量。