文章目录
1、概述:
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。
他的主要特点为:线程复用; 控制最大并发数; 管理线程。
第一: 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二: 提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
第三: 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降抵系统的稳定性,使用线程池可以逆行统一的分配,调优和监控
2、初始化线程的四种方式:
- 1、继承Thread
- 2、实现Runnable
- 3、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
- 4、线程池
优缺点:
- 1、2无法获取返回值,3可以获取返回值(阻塞式等待)
- 1、2、3都不能控制资源,
- 4可以控制资源,性能稳定
代码示例:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CreateThreadTest {
public static void main(String[] args) {
// 1、继承Thread
new Thread(new Thread01()).start();
// 2、实现Runnable
new Thread(new Rannable01()).start();
// 3、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
FutureTask<String> stringFutureTask = new FutureTask<>(new Callable01());
new Thread(stringFutureTask).start();
try {
log.info("结果:{}",stringFutureTask.get());//阻塞式等待
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 4、线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(()->log.info("方式4..."));//submit可以获取结果,execute无法获取结果
executorService.shutdown();//关闭
}
}
@Slf4j
class Thread01 extends Thread{
@Override
public void run() {
log.info("方式1...");
}
}
@Slf4j
class Rannable01 implements Runnable{
@Override
public void run() {
log.info("方式2...");
}
}
@Slf4j
class Callable01 implements Callable<String>{
@Override
public String call() throws Exception {
log.info("方式3...");
return "Callable01";
}
}
3、七大参数:
3.1、七大参数详解
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
int corePoolSize
,核心线程数【只要线程池不销毁(设置allowCoreThreadTimeOut属性),就一直存在】;线程池创建好之后准备就绪的线程数,等待接收异步亲求去执行;int maximumPoolSize
,最大线程数,可用于控制资源long keepAliveTime
,存活时间,当前线程数量大于指定的核心线程数量,只要存活时间达到该值,就会被释放。(maximumPoolSize-corePoolSize)TimeUnit unit
,时间单位BlockingQueue<Runnable> workQueue
,阻塞队列,如果任务很多,多余的任务会被放在队列中ThreadFactory threadFactory
,创建线程的工厂RejectedExecutionHandler handler
如果队列满了,按照我们指定的拒绝策略执行任务
如果是要五大参数,就是前五个,后两个默认:
- 例如:使用五个参数的构造方法:
其中默认的拒绝策略:(丢弃并抛异常)
3.2、如何合理设置核心线程数:
CPU密集型
IO密集型
方式1:
方式2:
4、工作顺序:
工作顺序【1核心线程 -> 2阻塞队列 -> 3最大线程 -> 4拒绝策略(超过最大线程)、释放线程(超过空闲时间)】
- 1、线程池创建,准备好指定数量(
corePoolSize
)的核心线程,以接收任务 - 2、核心线程满了,再进入的线程就会被放进阻塞队列(
workQueue
)中,当有空闲核心线程,就会去阻塞队列中获取任务 - 3、阻塞队列满了,就直接开新的线程执行,最大只能开到
maximumPoolSize
指定的数量 - 4、达到
maximumPoolSize
个线程,就用拒绝策略拒绝任务 - 5、如果当前线程数大于指定核心线程数(
corePoolSize
),多出的每一个线程在空闲指定时间(keepAliveTime
)过后,释放线程
依据:从execute方法可以看出来:
5、Executors常用创建线程池方法:
前三种更为常用,都是使用ThreadPoolExecutor创建的线程池;第四种采用ScheduledThreadPoolExecutor创建线程池;第5种采用ForkJoinPool创建线程池;他们的关系如下图:
1、
Executors.newCachedThreadPool();
core是0,所有都可回收;适合执行短期异步的小程序或者负载较轻的服务器2、
Executors.newFixedThreadPool();
固定大小,core=max;适合执行长期的任务3、
Executors.newSingleThreadExecutor();
单线程的线程池,后台从队列里面获取任务,挨个执行;适合一个任务一个任务执行的场景4、
Executors.newScheduledThreadPool();
定时任务的线程池5、
Executors.newWorkStealingPool();
ForkJoinPool 分支合并
一般不使用上述方法创建线程池,前两个阻塞队列的大小是Integer.MAX_VALUE,后两个的最大线程数是Integer.MAX_VALUE。如,阿里巴巴Java开发手册中这样描述:
6、四种拒绝策略:
是什么?
- 等待队列也已经排满了,再也塞不下新任务了同时,
- 线程池中的max线程也达到了,无法继续为新任务服务。
- 这时候我们就需要拒绝策略机制合理的处理这个问题。
先看接口类RejectedExecutionHandler
:只有一个rejectedExecution方法,来决定如何拒绝
它有以下四个实现类:
DiscardOldestPolicy
: poll,移除第一个;删除工作队列 最早的一个,再尝试运行当前的r(依照工作顺序运行,即再检验一遍是否小于核心线程数、队列是否满了、是否达到最大线程数)。AbortPolicy
: 很明显,直接抛异常CallerRunsPolicy
: 直接调用run方法,哪来的就在哪运行DiscardPolicy
:这个有意思,啥都没干,相当于直接忽略
代码演示:
- 使用下面的代码,分别测试四种拒绝策略:
- 核心2,最大5,阻塞3,10个任务(0~9)
- 前两个:0、1直接运行
- 2、3、4进入阻塞队列
- 5、6、7新开线程
- 8、9走拒绝策略
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
log.info(rejectedExecutionHandler.getClass().getName());
// 核心2,最大5,阻塞队列3;最多同时执行 5 + 3 = 8 个任务
ExecutorService executorService = new ThreadPoolExecutor(2,5,
30, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
rejectedExecutionHandler);
try {
//运行10个任务,会有多出的任务
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.execute(() -> {
log.info("执行任务{}...", finalI);
});
}
} finally {
executorService.shutdown();
}
抛异常:
直接忽略组后来的:
丢弃队列最早的:
哪来的在哪运行:最后来的两个在main中运行:
7、CompletableFuture启动异步任务
- runAsync:无返回值
- supplyAsync:有返回值
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CompletableFutureTest {
private static ExecutorService executor = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
//方式1、无返回值
CompletableFuture.runAsync(()->log.info("runAsync"),executor);
// 方式2、有返回值
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return "supplyAsync";
}, executor);
//获取返回值
try {
String res = supplyAsync.get();
log.info("返回值:{}",res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
8、方法执行完成后的感知
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
() -> {
log.info("创建...");
return 10 / 0;
}, executor)
//处理完成后用当前线程处理,如果为whenCompleteAsync,则用其他线程处理
.whenComplete((res, e) -> {
log.info("结果:{},异常:{}", res, e);
})
//出现异常后的回调
.exceptionally((throwable) -> {
log.info("出现异常,返回0,异常:{}", throwable.getCause().toString());
return 0;
});
//打印结果
try {
log.info("结果:{}",supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
9、方法执行完成后的处理
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
() -> {
log.info("创建...");
return 10 / 2;
// return 10 / 0;
}, executor)
//参数:BiFunction<? super T, Throwable, ? extends U> fn;接收结果、异常,返回新结果
.handle((res, e) -> {
if (res != null) {
log.info("结果不为空");
return res * 2;
}
if (e != null) {
log.info("有异常");
return 0;
}
log.info("其他情况");
return 0;
});
//打印结果
try {
log.info("结果:{}", supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
10、线程池串行方法
基本方法:
thenApply
:接收上一个返回结果,返回新结果thenAccept
:只能接收上一个返回结果,无返回值thenRun
:不接受返回结果也不返回结果
现实现一个需求:
- 先启动一个线程,任务1
- 任务2获取任务1的返回结果,并返回新结果
- 任务3在同一个线程上获取任务2的结果,但不返回任何结果
- 任务4既不接收上一个返回结果,也不返回结果
代码:
private static void 线程池串行方法() {
CompletableFuture.supplyAsync(
() -> {
log.info("任务1...");
return 111;
})
//获取结果,返回新结果(类型可不同)
.thenApplyAsync((res) -> {
log.info("任务2...获取到上一个的结果:{}", res);
return "222";
}, executor)
//获取上一个结果,但不返回结果
.thenAccept((res) -> {
log.info("任务3...获取到上一个的结果:{}", res);
})
//既不获取,也不返回
.thenRunAsync(() -> {
log.info("任务4...无法获取到上一个的结果");
}, executor);
// 在测试包下运行,线程睡眠,防止提前结束
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
11、两任务组合
11.1、都要完成
thenCombine
:组合两个future,获取两个future的返回结果,并返回当前任务的返回结果thenAcceptBoth
:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值runAfterBoth
:组合两个future,不需要获取future的结果,只需两个future处理完任务后处理任务- 以上方法后面加上
Async
,表示新开一个线程,例如:thenCombineAsync
示例:使用第三个任务,获取前两个任务的返回结果,并进行拼串,返回最终结果:
//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
() -> {
log.info("任务1...");
return 111;
}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
() -> {
log.info("任务2...");
return 222;
}, executor);
//任务3在前两个执行完后执行:
CompletableFuture<String> cf3 = cf1.thenCombineAsync(cf2, (f1, f2) -> {
log.info("任务3...获取到的结果:{},{}", f1, f2);
return f1 + "" + f2;
}, executor);
//打印任务3的结果
try {
log.info("任务3的返回值:{}",cf3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
11.2、一个完成即可
applyToEither
:两个任务有一个执行完成,获取它的返回值,处理任务并由新返回值acceptEither
:两个任务有一个执行完成,获取它的返回值,处理任务,但无返回值runAfterEither
:两个任务有一个执行完成,不获取它的返回值,处理任务,也没有返回值- 均可加
Async
- 组合的两个任务需要有相同的返回值
以下测试三种方式:
//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
() -> {
log.info("任务1...");
return 111;
}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
() -> {
log.info("任务2...开始");
try {
Thread.sleep(2000);
log.info("任务2...结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 222;
}, executor);
//=========方式1=========
//任务3在前两个执行完后执行:
CompletableFuture<Integer> cf3 = cf1.applyToEitherAsync(cf2, (f1) -> {
log.info("任务3-1...获取到的结果:{},并把结果扩大2倍", f1);
return f1 * 2;
}, executor);
//打印cf3的结果
try {
log.info("任务3-1的返回值:{}", cf3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//=========方式2=========
cf1.acceptEitherAsync(cf2, (f1) -> {
log.info("任务3-2获取到的结果:{}", f1);
}, executor);
//=========方式3=========
cf1.runAfterEitherAsync(cf2,
() -> log.info("任务3-3:前两个任务有一个完成了"),
executor);
12、多任务组合
CompletableFuture中有两个静态方法:
allOf
:所有任务都完成anyOf
:一个任务完成即可
代码示例:
//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
() -> {
log.info("任务1...");
return 111;
}, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
() -> {
log.info("任务2...开始");
try {
Thread.sleep(2000);
log.info("任务2...结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 222;
}, executor);
//一个完成即可:
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(cf1, cf2);
try {
log.info("有一个任务完成了,返回值:{}",anyOf.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//都要完成:
CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2);
try {
log.info("任务都完成了,返回值:{},{}",cf1.get(),cf2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}