线程池ThreadPoolExecutor的使用及拒绝策略


一、ThreadPoolExecutor介绍

1.为什么要用线程池?

过于频繁的创建/销毁线程,会影响处理效率和运行速度,消耗系统资源,降低系统稳定性,一不小心搞崩公司系统直接下岗,而使用线程池可以避免这些问题,稳住饭碗,并且可以对线程这个稀缺资源进行管理、调优与监控

ThreadPoolExecutor的构造函数ThreadPoolExecutor的构造函数

2.ThreadPoolExecutor构造函数的核心参数

corePoolSzie: 线程核心池所能容纳的线程数,满后仍有任务进入则进入阻塞队列等待

maximumPoolSize: 线程池最多所能容纳的线程数,阻塞队列也满后,仍有任务进入,会新建线程(非核心线程)来执行任务,核心池线程数+新建线程数不得超过此参数数值,否则会触发拒绝策略拒绝任务。即新建的非核心线程数最大值 = maximumPoolSize - corePoolSize

keepAliveTime: 非核心线程若超过此参数指定时间没有任务执行,则销毁线程
TimeUnit unit: 上个参数的时间单位,TimeUnit是枚举类型,有七种参数:
NANOSECONDS : 1微毫秒 = 1秒 / 1000 * 1000 * 1000
MICROSECONDS : 1微秒 = 1秒 / 1000 * 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天

BlockingQueue<Runnable> workQueue: 阻塞队列,常用workQueue类型:
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
threadFactory:用于创建工作线程的工厂。
handler:拒绝策略,往线程池添加任务时,将在下面两种情况触发拒绝策略:1)线程池运行状态不是 RUNNING;2)线程池已经达到最大线程数,并且阻塞队列已满时。

3.线程池运行流程

在这里插入图片描述

4.创建线程池实例

代码如下:

//创建核心线程池; 核心池容量=3; 最大线程数=10; 阻塞队列类型为ArrayBlockingQueue,容量=5; KeepAliveTime = 10ms
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));

二、如何使用线程池

1.创建线程

代码如下:

public class MyThread implements Runnable{
    //region 构造器

    /**
     * @param flag 线程编号
     * @param currentBlockingQueueSize 当前阻塞队列等待的线程数量
     */
    public MyThread(int flag, int currentBlockingQueueSize) {
        this.flag = flag;
        this.currentBlockingQueueSize = currentBlockingQueueSize;
    }
    //endregion

    //region 变量
    private int flag;
    private int currentBlockingQueueSize;
    //endregion


    //region run函数重写

    @Override
    //线程运行
    public void run() {
        System.out.println("线程编号:" + getFlag() + "运行中----" + "当前队列中有" + currentBlockingQueueSize + "个线程");

        //线程执行时间3s
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //endregion


    //region 私有方法

    //获取当前线程编号
    private int getFlag() {
        return flag;
    }

    //endregion

}

2.创建线程池,并将线程加入线程池

代码如下:

public class RefusePolicyTest {
    public static void main(String[] args) {
        //加入线程池的线程数
        int THREAD_COUNT = 5;

        //创建核心线程池; 核心池容量=3; 最大线程数=10; 阻塞队列类型为ArrayBlockingQueue,容量=5; KeepAliveTime = 10ms
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));

        //调用RefusePolicy类设置拒绝策略
        RefusePolicy refusePolicy = new RefusePolicy(executor);

        //将线程加入线程池
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.execute(new MyThread(i, executor.getQueue().size()));
        }

        //关闭线程
        executor.shutdown();
        System.out.println("关闭");
    }
}

三、线程数量超过maximumPoolSize时的四种拒绝策略

代码:
线程创建:

/**
 * @version 1.0
 * @Author szh
 * @Date 2021/7/23 10:27
 * @注释 创建Thread用于加入线程池
 */
public class MyThread implements Runnable{
    //region 构造器

    /**
     * @param flag 线程编号
     * @param currentBlockingQueueSize 当前阻塞队列等待的线程数量
     */
    public MyThread(int flag, int currentBlockingQueueSize) {
        this.flag = flag;
        this.currentBlockingQueueSize = currentBlockingQueueSize;
    }
    //endregion

    //region 变量
    private int flag;
    private int currentBlockingQueueSize;
    //endregion


    //region run函数重写

    @Override
    //线程运行
    public void run() {
        System.out.println("线程编号:" + getFlag() + "运行中----" + "当前队列中有" + currentBlockingQueueSize + "个线程");

        //线程执行时间3s
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //endregion


    //region 私有方法

    //获取当前线程编号
    private int getFlag() {
        return flag;
    }

    //endregion

}

拒绝策略设置:

/**
 * @version 1.0
 * @Author szh
 * @Date 2021/7/23 11:31
 * @注释  策略测试方法类
 */
public class RefusePolicy {

    //region 构造器
    public RefusePolicy(ThreadPoolExecutor executor) {
        this.executor = executor;
    }
    //endregion


    //region 变量
    private static ThreadPoolExecutor executor;
    //endregion


    //region 四种策略设置
    //AbortPolicy拒绝策略 --丢任务并抛出异常
    static void SetAbortPolicyTest() {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        System.out.println("正在使用AbortPolicy拒绝策略:");
    }


    //DiscardPolicy拒绝策略 --丢任务但不抛出异常
    static void SetDiscardPolicyTest() {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        System.out.println("正在使用DiscardPolicy拒绝策略:");
    }


    //DiscardOldestPolicy拒绝策略 --丢弃最早进入队列的任务
    static void SetDiscardOldestPolicyTest() {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        System.out.println("正在使用DiscardOldestPolicy拒绝策略:");
    }


    //CallerRunsPolicy拒绝策略 --主线程运行
    static void SetCallerRunsPolicyTest() {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        System.out.println("正在使用CallerRunsPolicy拒绝策略:");
    }
    //endregion

}

将线程加入线程池,并测试策略:

/**
 * @version 1.0
 * @Author szh
 * @Date 2021/7/23 9:54
 * @注释 调用RefusePolicy类的方法测试拒绝策略
 */
public class RefusePolicyTest {
    public static void main(String[] args) {
        //加入线程池的线程数
        int THREAD_COUNT = 20;

        //创建核心线程池; 核心池容量=3; 最大线程数=10; 阻塞队列类型为ArrayBlockingQueue,容量=5; KeepAliveTime = 10ms
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));

        //调用RefusePolicy类设置拒绝策略
        RefusePolicy refusePolicy = new RefusePolicy(executor);


		//设置策略
        refusePolicy.SetAbortPolicy();
        //refusePolicy.SetDiscardPolicyTest();
        //refusePolicy.SetDiscardOldestPolicyTest();
        //refusePolicy.SetCallerRunsPolicyTest();



        //将线程加入线程池
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.execute(new MyThread(i, executor.getQueue().size()));
        }

        //关闭线程
        executor.shutdown();
        System.out.println("关闭");
    }
}

线程池最大可容纳maximumPoolSize + WorkQueue,即15个任务,我们设置了20个向线程池加入,各策略运行结果如下:

1. AbortPolicy(默认策略)结果

默认策略运行结果
可见,一开始线程池无任务,0,1,2号顺利进入核心池,核心池大小为3,3号线程进入时发现核心池满,于是3-7号线程依次进入阻塞队列,队列容量为5。然后8号任务进入时,会创建新的线程处理任务,于是8-14共7个任务会被新建的非核心线程处理,15号以后的任务进入,发现正在运行的线程数已达到maximumPoolSize,触发拒绝策略,丢弃并抛出异常,紧接着队列中的3-7也被处理。
AbortPolicy是默认策略,抛弃多余任务,并抛出异常。


2. DiscardPolicy

DiscardPolicy
与默认策略类似,只是不会抛出异常,默默丢弃任务


3. DiscardOldestPolicy

DiscardOldestPolicy
前面的都是一样,0-2号3个线程仍正常进入核心池,然后3-7进入阻塞队列,8-14号新建线程去处理,但是15进入并不会被丢弃,而是丢弃队列中的最先进入的任务,即3,然后15进入队列,同理16、17、18、19进入会丢弃队列中的4、5、6、7,然后进入队列,因此3-7被抛弃,15-19会进入队列等待被处理。


4. CallerRunsPolicy

在这里插入图片描述

前面仍然相同,3s后(run函数sleep了3s)
在这里插入图片描述
15号及以后的任务会在进入时被拒绝,然后被回退至调用者,即主线程去执行,所以,一开始15会被主线程执行,然后等待3s后,先进入的执行完毕后,其余的也会被处理。


总结

以上就是今天记录的内容,本文仅仅简单介绍了ThreadPoolExecutor的使用及其四种拒绝策略:
  1. AbortPolicy(默认):丢弃多余任务,并抛出异常
  2. DiscardPolicy:丢弃多余任务,但不抛出异常
  3. DiscardOldestPolicy:丢弃最先进队列的任务,然后重新尝试加入任务
  4. CallerRunsPolicy:不会抛弃任务,也不会抛出异常,而是将任务回退至调用者执行

版权声明:本文为Rain_Butter_fly原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。