文章目录
一、ThreadPoolExecutor介绍
1.为什么要用线程池?
过于频繁的创建/销毁线程,会影响处理效率和运行速度,消耗系统资源,降低系统稳定性,一不小心搞崩公司系统直接下岗,而使用线程池可以避免这些问题,稳住饭碗,并且可以对线程这个稀缺资源进行管理、调优与监控
ThreadPoolExecutor的构造函数
2.ThreadPoolExecutor构造函数的核心参数
corePoolSzie: 线程核心池所能容纳的线程数,满后仍有任务进入则进入阻塞队列等待
maximumPoolSize: 线程池最多所能容纳的线程数,阻塞队列也满后,仍有任务进入,会新建线程(非核心线程)来执行任务,核心池线程数+新建线程数不得超过此参数数值,否则会触发拒绝策略拒绝任务。即新建的非核心线程数最大值 = maximumPoolSize - corePoolSize
keepAliveTime: 非核心线程若超过此参数指定时间没有任务执行,则销毁线程
TimeUnit unit: 上个参数的时间单位,TimeUnit是枚举类型,有七种参数:
NANOSECONDS : 1微毫秒 = 1秒 / 1000 * 1000 * 1000
MICROSECONDS : 1微秒 = 1秒 / 1000 * 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天
BlockingQueue<Runnable> workQueue: 阻塞队列,常用workQueue类型:
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
threadFactory:用于创建工作线程的工厂。
handler:拒绝策略,往线程池添加任务时,将在下面两种情况触发拒绝策略:1)线程池运行状态不是 RUNNING;2)线程池已经达到最大线程数,并且阻塞队列已满时。
3.线程池运行流程
4.创建线程池实例
代码如下:
//创建核心线程池; 核心池容量=3; 最大线程数=10; 阻塞队列类型为ArrayBlockingQueue,容量=5; KeepAliveTime = 10ms
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
二、如何使用线程池
1.创建线程
代码如下:
public class MyThread implements Runnable{
//region 构造器
/**
* @param flag 线程编号
* @param currentBlockingQueueSize 当前阻塞队列等待的线程数量
*/
public MyThread(int flag, int currentBlockingQueueSize) {
this.flag = flag;
this.currentBlockingQueueSize = currentBlockingQueueSize;
}
//endregion
//region 变量
private int flag;
private int currentBlockingQueueSize;
//endregion
//region run函数重写
@Override
//线程运行
public void run() {
System.out.println("线程编号:" + getFlag() + "运行中----" + "当前队列中有" + currentBlockingQueueSize + "个线程");
//线程执行时间3s
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//endregion
//region 私有方法
//获取当前线程编号
private int getFlag() {
return flag;
}
//endregion
}
2.创建线程池,并将线程加入线程池
代码如下:
public class RefusePolicyTest {
public static void main(String[] args) {
//加入线程池的线程数
int THREAD_COUNT = 5;
//创建核心线程池; 核心池容量=3; 最大线程数=10; 阻塞队列类型为ArrayBlockingQueue,容量=5; KeepAliveTime = 10ms
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
//调用RefusePolicy类设置拒绝策略
RefusePolicy refusePolicy = new RefusePolicy(executor);
//将线程加入线程池
for (int i = 0; i < THREAD_COUNT; i++) {
executor.execute(new MyThread(i, executor.getQueue().size()));
}
//关闭线程
executor.shutdown();
System.out.println("关闭");
}
}
三、线程数量超过maximumPoolSize时的四种拒绝策略
代码:
线程创建:
/**
* @version 1.0
* @Author szh
* @Date 2021/7/23 10:27
* @注释 创建Thread用于加入线程池
*/
public class MyThread implements Runnable{
//region 构造器
/**
* @param flag 线程编号
* @param currentBlockingQueueSize 当前阻塞队列等待的线程数量
*/
public MyThread(int flag, int currentBlockingQueueSize) {
this.flag = flag;
this.currentBlockingQueueSize = currentBlockingQueueSize;
}
//endregion
//region 变量
private int flag;
private int currentBlockingQueueSize;
//endregion
//region run函数重写
@Override
//线程运行
public void run() {
System.out.println("线程编号:" + getFlag() + "运行中----" + "当前队列中有" + currentBlockingQueueSize + "个线程");
//线程执行时间3s
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//endregion
//region 私有方法
//获取当前线程编号
private int getFlag() {
return flag;
}
//endregion
}
拒绝策略设置:
/**
* @version 1.0
* @Author szh
* @Date 2021/7/23 11:31
* @注释 策略测试方法类
*/
public class RefusePolicy {
//region 构造器
public RefusePolicy(ThreadPoolExecutor executor) {
this.executor = executor;
}
//endregion
//region 变量
private static ThreadPoolExecutor executor;
//endregion
//region 四种策略设置
//AbortPolicy拒绝策略 --丢任务并抛出异常
static void SetAbortPolicyTest() {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
System.out.println("正在使用AbortPolicy拒绝策略:");
}
//DiscardPolicy拒绝策略 --丢任务但不抛出异常
static void SetDiscardPolicyTest() {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
System.out.println("正在使用DiscardPolicy拒绝策略:");
}
//DiscardOldestPolicy拒绝策略 --丢弃最早进入队列的任务
static void SetDiscardOldestPolicyTest() {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
System.out.println("正在使用DiscardOldestPolicy拒绝策略:");
}
//CallerRunsPolicy拒绝策略 --主线程运行
static void SetCallerRunsPolicyTest() {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
System.out.println("正在使用CallerRunsPolicy拒绝策略:");
}
//endregion
}
将线程加入线程池,并测试策略:
/**
* @version 1.0
* @Author szh
* @Date 2021/7/23 9:54
* @注释 调用RefusePolicy类的方法测试拒绝策略
*/
public class RefusePolicyTest {
public static void main(String[] args) {
//加入线程池的线程数
int THREAD_COUNT = 20;
//创建核心线程池; 核心池容量=3; 最大线程数=10; 阻塞队列类型为ArrayBlockingQueue,容量=5; KeepAliveTime = 10ms
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
//调用RefusePolicy类设置拒绝策略
RefusePolicy refusePolicy = new RefusePolicy(executor);
//设置策略
refusePolicy.SetAbortPolicy();
//refusePolicy.SetDiscardPolicyTest();
//refusePolicy.SetDiscardOldestPolicyTest();
//refusePolicy.SetCallerRunsPolicyTest();
//将线程加入线程池
for (int i = 0; i < THREAD_COUNT; i++) {
executor.execute(new MyThread(i, executor.getQueue().size()));
}
//关闭线程
executor.shutdown();
System.out.println("关闭");
}
}
线程池最大可容纳maximumPoolSize + WorkQueue,即15个任务,我们设置了20个向线程池加入,各策略运行结果如下:
1. AbortPolicy(默认策略)结果
可见,一开始线程池无任务,0,1,2号顺利进入核心池,核心池大小为3,3号线程进入时发现核心池满,于是3-7号线程依次进入阻塞队列,队列容量为5。然后8号任务进入时,会创建新的线程处理任务,于是8-14共7个任务会被新建的非核心线程处理,15号以后的任务进入,发现正在运行的线程数已达到maximumPoolSize,触发拒绝策略,丢弃并抛出异常,紧接着队列中的3-7也被处理。
AbortPolicy是默认策略,抛弃多余任务,并抛出异常。
2. DiscardPolicy
与默认策略类似,只是不会抛出异常,默默丢弃任务
3. DiscardOldestPolicy
前面的都是一样,0-2号3个线程仍正常进入核心池,然后3-7进入阻塞队列,8-14号新建线程去处理,但是15进入并不会被丢弃,而是丢弃队列中的最先进入的任务,即3,然后15进入队列,同理16、17、18、19进入会丢弃队列中的4、5、6、7,然后进入队列,因此3-7被抛弃,15-19会进入队列等待被处理。
4. CallerRunsPolicy
前面仍然相同,3s后(run函数sleep了3s)
15号及以后的任务会在进入时被拒绝,然后被回退至调用者,即主线程去执行,所以,一开始15会被主线程执行,然后等待3s后,先进入的执行完毕后,其余的也会被处理。
总结
以上就是今天记录的内容,本文仅仅简单介绍了ThreadPoolExecutor的使用及其四种拒绝策略:- AbortPolicy(默认):丢弃多余任务,并抛出异常
- DiscardPolicy:丢弃多余任务,但不抛出异常
- DiscardOldestPolicy:丢弃最先进队列的任务,然后重新尝试加入任务
- CallerRunsPolicy:不会抛弃任务,也不会抛出异常,而是将任务回退至调用者执行