【Java多线程】JUC之线程池(二)再次邂逅线程池ThreadPoolExecutor的原理

前言

线程池的概念以及使用场景在文章【Java多线程】线程池(一)与线程池的初识 里已经讲的很清楚了,学习前建议复习一下。ThreadPoolExecutor是线程池的实现类,本文通过对ThreadPoolExecutor源码的分析(基于JDK 1.8),来深入分析线程池的实现原理。

一.概述

1.线程池优点

  • 降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;
  • 提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。

2.线程池的基本组成

一个线程池包括以下四个基本组成部分:

  1. 线程工厂(ThreadFactory):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  2. 工作线程(Worker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  3. 任务接口(Runnable):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  4. 任务队列(workQueue):用于存放待执行的任务。提供一种缓冲机制。

3.原理

  • 线程池刚启动的时候核心线程数为0
  • 提交任务给线程池的时候,线程池会新开启线程来执行这个任务
  • 如果线程数小于corePoolSize,即使工作线程处于空闲状态,也会创建一个新线程来执行新任务
  • 如果线程数大于或等于corePoolSize,则会将任务放到workQueue,也就是任务队列
  • 如果任务队列满了,且线程数小于maximumPoolSize,则会创建一个新线程来运行任务
  • 如果任务队列满了,且线程数大于或等于maximumPoolSize,则直接采取拒绝策略

    如果经过keepAliveTime 时间后,超过核心线程数corePoolSize的线程还没有接受到新的任务,就会被回收

在这里插入图片描述

举例说明

线程池参数配置:核心线程5个,最大线程数10个,队列长度为100。

  1. 线程池初始化的时候不会创建任何线程,线程数为0
  2. 假设进来6个请求,则会创建5个核心线程来处理5个请求,另一个没被处理到的进入到任务队列
  3. 这时候又进来99个请求,线程池发现核心线程满了,任务队列还在空着99个位置,所以会将99个任务加入到队列中,加上刚才的1个,任务队列中正好100个
  4. 这时候再次进来5个请求,线程池会再次开辟5个非核心线程来处理这五个请求。
  5. 目前的情况是线程池里线程数是10个RUNNING状态的,任务队列里100个也满了。如果这时候又进来1个请求,则直接走任务拒绝策略

二.ThreadPoolExecutor类图

在这里插入图片描述

三.ThreadPoolExecutor的属性

	// 线程池的控制状态,可以看做一个int类型的数字,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程worker的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //`Integer.SIZE`为32,所以`COUNT_BITS`为29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池的运行状态,总共有5个状态,用高3位来表示,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    //任务缓存队列,用来存放等待执行的任务
    private final BlockingQueue<Runnable> workQueue;

    //全局锁,对线程池状态等属性修改时需要使用这个锁
    private final ReentrantLock mainLock = new ReentrantLock();

    //线程池中工作线程的集合,访问和修改需要持有全局锁
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 终止条件
    private final Condition termination = mainLock.newCondition();

    //线程池中曾经出现过的最大线程数
    private int largestPoolSize;

    //已完成任务的数量
    private long completedTaskCount;

    //线程工厂
    private volatile ThreadFactory threadFactory;

    //任务拒绝策略
    private volatile RejectedExecutionHandler handler;

    //线程存活时间
    private volatile long keepAliveTime;

    //是否允许核心线程超时
    private volatile boolean allowCoreThreadTimeOut;

    //核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0
    private volatile int corePoolSize;

    //最大池大小,不得超过CAPACITY
    private volatile int maximumPoolSize;

    //默认的任务拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    private final AccessControlContext acc;

线程池状态是控制线程池生命周期至关重要的属性,通过上面的源码可知,线程池的运行状态总共有5种,如下:

  • RUNNING: 高3位为111,线程池 接受新任务并处理阻塞队列中的任务
  • SHUTDOWN: 高3位为000,线程池 不接受新任务但会处理阻塞队列中的任务
  • STOP: 高3位为001,线程池 不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务
  • TIDYING: 高3位为010,线程池 所有任务都已终止工作线程数量为0,线程池将转化到TIDYING状态,即将要执行 terminated()钩子方法
  • TERMINATED: 高3位为011 终止状态。terminated方法调用完成后 的状态

按大小排序如下RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

然而,线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态, 低29位来表示线程池中工作线程的数量

对变量ctl的操作主要参考以下几个函数

	//通过与的方式,获取ctl的高3位,也就是线程池的运行状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //通过与的方式,获取ctl的低29位,也就是线程池中工作线程的数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //通过或的方式,将线程池状态和线程池中工作线程的数量打包成ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    //SHUTDOWN状态的值是0,比它大的均是线程池停止或清理状态,比它小的是运行状态
    private static boolean isRunning(int c) { return c < SHUTDOWN; }

线程池状态的转换情况
在这里插入图片描述

  • RUNNING -> SHUTDOWN:显示调用shutdown()方法,或者隐式调用了finalize()方法里面的shutdown()方法。

  • (RUNNING or SHUTDOWN) -> STOP:显示调用shutdownNow()方法。

  • SHUTDOWN -> TIDYING:当线程池和任务队列都为空时

  • STOP -> TIDYING:当线程池为空时

  • TIDYING -> TERMINATED:当钩子函数terminated()方法执行结束时

如上图所示,通常情况下,线程池有如下两种状态转换流程:

  • RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
  • RUNNING -> STOP -> TIDYING -> TERMINATED

四.ThreadPoolExecutor的构造方法

通常情况下,我们使用线程池的方式就是new一个ThreadPoolExecutor对象来生成一个线程池。

	//间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicy和默认的线程工厂
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue);
    //间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicy
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory);
    //间接调用最后一个构造函数,采用默认的默认的线程工厂
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler);
    //前面三个分别调用了最后一个,主要的构造函数
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);

最后一个构造函数的具体实现

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 基本类型参数校验
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 空指针校验
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    // 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心线程数(工作线程数)
    线程池在完成初始化之后,默认情况下,线程池中不会有任何线程,线程池会等有任务来的时候再去创建线程(核心线程)。核心线程创建后即使超出线程最大空闲时间keepAliveTime也不会销毁,只要创建就永驻了,就等着新任务进来进行处理。如果当前线程数超过corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行

  • maximumPoolSize:最大线程数
    核心线程超过corePoolSize会将新提交的任务加入任务队列中等待执行,如果任务队列满了的情况下,还有新任务进来的话就会继续创建新的线程,当创建的线程数(核心线程+非核心线程)大于maximumPoolSize后就不会产生新线程了,就会执行拒绝策略`。

  • keepAliveTime:线程保持的存活时间
    核心线程超过corePoolSize任务队列满了 的情况下创建的线程 称为"非核心线程",如果这些"非核心线程" 的空闲时间超出keepAliveTime,就会被会回收。直到线程池中的线程数不超过corePoolSize。

    如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

  • unit:线程保持的存活时间单位
    比如:TimeUnit.MILLISECONDS、TimeUnit.SECONDS

  • workQueue:任务存储队列
    核心线程数corePoolSize满了后还有任务继续提交到线程池的话,就先进入workQueue

    workQueue通常情况下有如下选择:

    • LinkedBlockingQueue:无界队列,意味着无限制,其实是有限制,大小是int的最大值。也可以自定义大小。
    • ArrayBlockingQueue:有界队列,可以自定义大小,到了阈值就开启新线程(不会超过maximumPoolSize)。
    • SynchronousQueue:Executors.newCachedThreadPool();默认使用的队列。也不算是个队列,他不没有存储元素的能力。

    一般都采取LinkedBlockingQueue,因为他也可以设置大小,可以取代ArrayBlockingQueue有界队列

  • threadFactory:线程工厂
    当线程池需要新的线程时,会用threadFactory来生成新的线程

    • 默认采用的是DefaultThreadFactory,主要负责创建线程。new Thread()方法。创建出来的线程都在同一个线程组且优先级也是一样的。
  • handler:任务拒绝策略
    任务量超出线程池最大线程数maximumPoolSize执行shutdown()还在继续提交任务的话,会执行handler的逻辑。

    • AbortPolicy默认采用的是AbortPolicy,遇到上面的情况,线程池将直接采取直接拒绝策略,也就是丢弃任务,直接抛出异常。RejectedExecutionException
    • CallerRunsPolicy:由调用execute方法的线程执行该任务;
    • DiscardPolicy:丢弃任务,但是不抛出异常;
    • DiscardOldestPolicy:丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。

当然也可以根据应用场景实现RejectedExecutionHandler接口自定义拒绝策略,如记录日志或持久化存储不能处理的任务。


五.线程池的实现原理

1.提交任务

线程池框架提供了两种方式提交任务,submit()和execute()

  • 通过submit()方法提交的任务可以返回任务执行的结果
  • 通过execute()方法提交的任务不能获取任务执行的结果

submit()方法的实现有以下3种:

	public Future<?> submit(Runnable task);
    public <T> Future<T> submit(Runnable task, T result);
    public <T> Future<T> submit(Callable<T> task);

下面以第一个方法为例简单看一下submit()方法的实现:

	public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

submit()方法是在ThreadPoolExecutor父类AbstractExecutorService类 实现的,最终还是调用的ThreadPoolExecutor类的execute()方法,下面着重看一下execute()方法的实现。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取线程池控制状态
    int c = ctl.get();
     //workerCountOf(c)会获取当前正在运行的worker数量
    //(1)worker数量比核心线程数corePoolSize小,直接创建worker执行任务
    if (workerCountOf(c) < corePoolSize) {
    	//创建worker,addWorker方法boolean参数用来判断是否创建核心线程
        if (addWorker(command, true))
           //成功则返回
            return;
         //失败则再次获取线程池控制状态
        c = ctl.get();
    }
    //isRunning(c)是判断线程池是否在运行中,如果线程池被关闭了就不会再接受任务
    //(2) worker数量超过核心线程数 且 线程池处于RUNNING状态 且 任务直接加入队列成功
    if (isRunning(c) && workQueue.offer(command)) {
    	 // 添加成功,再次检查,获取线程池控制状态,防止在任务入队的过程中线程池关闭了或者线程池中没有线程了
        int recheck = ctl.get();
        // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
        // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
       //线程池不处于RUNNING状态,且将任务从workQueue移除成功
        if (! isRunning(recheck) && remove(command))
        	//采取任务拒绝策略
            reject(command);
        // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
        else if (workerCountOf(recheck) == 0)
         	//创建worker
            addWorker(null, false);
    }
    // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
    // 这儿有3点需要注意:
    // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
    // 2. addWorker第2个参数表示是否创建核心线程
    // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
    else if (!addWorker(command, false))//(3)如果创建worker失败
       //如果创建worker失败,就执行拒绝策略
        reject(command);
}

execute()方法的执行流程可以总结如下:

在这里插入图片描述

  • 若线程池工作线程数量(核心线程)小于corePoolSize,则创建新线程来执行任务
  • 工作线程数量(核心线程) 大于或等于 corePoolSize,则将任务加入BlockingQueue
  • 无法将任务加入BlockingQueue(BlockingQueue已满),且工作线程数量(核心线程) 小于 maximumPoolSize,则创建新的线程来执行任务
  • 工作线程数量达到maximumPoolSize,则创建线程失败,执行任务拒绝策略

2.创建线程

从execute()方法的实现可以看出,addWorker()方法主要负责 创建新的线程并执行任务,代码实现如下:

addworker源码解析

//addWorker有两个参数:
//	1.Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;
//	2.boolean类型的core,表示是否创建核心线程
//该方法的返回值代表是否成功新增一个线程
private boolean addWorker(Runnable firstTask, boolean core) {
  // 使用自旋+cas失败重试来保证线程竞争问题
    retry:
    // 外层自旋
    for (;;) {
        //获取线程池的控制状态
        int c = ctl.get();
        //获取线程池的运行状态
        int rs = runStateOf(c);

        // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
        // (rs > SHUTDOWN) || 
        // (rs == SHUTDOWN && firstTask != null) || 
        // (rs == SHUTDOWN && workQueue.isEmpty())
        // 1. 线程池状态大于SHUTDOWN时,直接返回false
        // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
        // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 内层自旋
        for (;;) {
            int wc = workerCountOf(c);
            // worker数量超过容量,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 使用CAS的方式增加worker数量。
            // 若增加成功,创建线程前的所有条件校验都满足了,准备创建线程执行任务,则直接跳出外层循环进入到第二部分
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //重新获取线程池控制状态
            c = ctl.get();  // Re-read ctl
            //  如果线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环,对外层循环进行自旋
            if (runStateOf(c) != rs)
                continue retry;
            // 其他情况,直接内层循环进行自旋即可
            // else CAS failed due to workerCount change; retry inner loop
        } 
    }
    //到这里,创建线程前的所有条件校验都满足了,可以开始创建线程来执行任务
    //worker是否已经启动
    boolean workerStarted = false;
    //是否已将这个worker添加到workers这个HashSet中
    boolean workerAdded = false;
    Worker w = null;
    try {
       //创建一个worker,从这里可以看出对线程的包装
        w = new Worker(firstTask);
         //取出worker中的线程对象,Worker的构造方法会调用ThreadFactory来创建一个新的线程
        final Thread t = w.thread;
        if (t != null) {
           //获取全局锁, 并发的访问线程池workers对象必须加锁,持有锁的期间线程池也不会被关闭
            final ReentrantLock mainLock = this.mainLock;
            // worker的添加必须是串行的,因此需要加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                 //重新获取线程池的运行状态
                int rs = runStateOf(ctl.get());

				//小于SHUTTDOWN即RUNNING
               //等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker已经调用过了start()方法,会抛出异常,不再创建worker
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // worker创建并添加到线程池workers(工作线程集合)中
                    workers.add(w);
                    // 更新历史worker数量的最大值`largestPoolSize`变量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //设置新增标志位
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //如果worker是新增的,就启动该线程
            if (workerAdded) {
                t.start();
                //成功启动了线程,设置对应的标志位
                workerStarted = true;
            }
        }
    } finally {
         //若worker线程启动失败,做一些清理工作,例如从workers中移除新添加的worker并递减wokerCount
        if (! workerStarted)
            addWorkerFailed(w);
    }
     //返回线程是否启动成功
    return workerStarted;
}

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    //获取全局锁
    mainLock.lock();
    try {
        if (w != null)
            // 将工作线程从集合中移除
            workers.remove(w);
        	// 线程池中线程数量减一    
        	decrementWorkerCount();
        	//执行钩子函数tryTerminate
       		 tryTerminate();
    } finally {
    	//
        mainLock.unlock();
    }
}

总结一下,addWorker()方法完成了如下几件任务:

  • 原子性的增加workerCount
  • 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合
  • 启动worker对应的线程
  • 若线程启动失败,从workers移除新添加的worker,并原子性的减少workerCount

3.工作线程的实现

addWorker()方法的实现可以看出,工作线程的创建和启动都跟ThreadPoolExecutor中的内部类Worker有关。下面我们分析Worker类来看一下工作线程的实现。

线程池worker任务单元

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
  • Worker类继承自AQS类(AbstractQueuedSynchronizer同步器类),具有锁的功能,实现一个不可重入的锁
  • 实现了Runable接口,可以将自身作为一个任务在线程中执行

Worker的主要字段就下面三个,代码也比较简单。

//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//worker所对应的第一个任务,可能为空
Runnable firstTask;
//记录当前线程完成的任务数
volatile long completedTasks;

Worker的构造函数如下

Worker(Runnable firstTask) {
            //设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断
            setState(-1);
            //初始化第一个任务
            this.firstTask = firstTask;
            //利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
            //也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
            this.thread = getThreadFactory().newThread(this);
        }

Worker类继承了AQS类,重写了其相应的方法,实现了一个自定义的同步器实现了不可重入锁

		//是否持有独占锁
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        //尝试获取锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                //设置独占线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //尝试释放锁
        protected boolean tryRelease(int unused) {
            //设置独占线程为null
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //获取锁
        public void lock()        { acquire(1); }
        //尝试获取锁
        public boolean tryLock()  { return tryAcquire(1); }
        //释放锁
        public void unlock()      { release(1); }
        //是否持有锁
        public boolean isLocked() { return isHeldExclusively(); }

Worker类还提供了一个中断线程thread的方法

	void interruptIfStarted() {
            Thread t;
            //AQS状态大于等于0,worker对应的线程不为null,且该线程没有被中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

再来看一下Worker类run()方法的实现,会发现run()方法最终调用了ThreadPoolExecutor类的runWorker()方法

public void run() {
  runWorker(this);
}

4.线程复用机制-runworker

通过上文可以知道,worker中的线程start后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法runWorker()方法实现了线程池中的线程复用机制

下面我们来看一下runWorker()方法的实现。

final void runWorker(Worker w) {
	//  //获取当前线程
    Thread wt = Thread.currentThread();
    //获取w的firstTask
    Runnable task = w.firstTask;
    //设置w的firstTask为null
    w.firstTask = null;
    //释放锁,设置AQS的state为0,允许其他线程来中断自己
    w.unlock(); // allow interrupts
    // 这个变量用于判断是否进入过自旋(while循环), 标识线程是否异常终止,finally中processWorkerExit()方法会有不同逻辑
    boolean completedAbruptly = true;
    try {
        // 这儿是自旋
        // 1. 如果firstTask不为null,就直接执行这个任务
        // 2. 如果firstTask为null,就执行getTask()方法从队列中获取任务
        //  阻塞队列的特性就是:当队列为空时或者队列满了,当前线程会被删除/新增阻塞等待
         //3.这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环
        while (task != null || (task = getTask()) != null) {
        	//进入循环内部,代表已经获取到可执行的任务,则对worker对象加锁,保证线程在执行任务过程中不会被中断
            w.lock();
            
            // 这儿对worker进行加锁,是为了达到下面的目的
            // 1. 降低锁范围,提升性能
            // 2. 保证每个worker执行的任务是串行的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止,则对当前线程进行中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||//若线程池状态大于等于STOP,那么意味着该线程要中断
                 (Thread.interrupted() &&//线程被中断
                  runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池内部状态变化而被中断
                !wt.isInterrupted()) //确保该线程未被中断
                //发出中断请求
                wt.interrupt();
            // 执行任务,且在执行前后通过钩子函数`beforeExecute()`和`afterExecute()`来扩展其功能。
            // 这两个方法在当前类里面为空实现。
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                  //真正的执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                 //置空task,准备通过getTask()获取下一个任务
                task = null;
                // 已完成任务数加一 
                w.completedTasks++;
                //释放掉worker持有的独占锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
      		//到这里,线程执行结束,需要执行结束线程的一些清理工作
            //线程执行结束可能有两种情况:
            //1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束
            //2.任务执行过程中发生了异常
            //	第一种情况,getTask()返回null,那么getTask()中会将workerCount递减
            //	第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()方法是线程池的核心,实现了线程池中的线程复用机制,来看一下runWorker()方法都做了哪些工作:

  1. 运行第一个任务firstTask之后,循环调用getTask()方法获取任务,不断从任务缓存队列获取任务并执行
  2. 获取到任务之后就对worker对象加锁,保证线程在执行任务的过程中不会被中断,任务执行完会释放锁
  3. 在执行任务的前后,可以根据业务场景重写beforeExecute()和afterExecute()等Hook(钩子)方法;
  4. 执行通过getTask()方法获取到的任务
  5. 线程执行结束后,调用processWorkerExit()方法执行结束线程的一些清理工作

从runWorker()方法的实现可以看出,runWorker()方法中主要调用了getTask()方法processWorkerExit()方法,下面分别看一下这两个方法的实现。

getTask()的实现
getTask()方法用来不断地从任务缓存队列获取任务并交给线程执行,下面分析一下其实现。

  • 这里面涉及到keepAliveTime的使用,从这个方法我们可以看出线程池是怎么让超过corePoolSize的那部分worker销毁的
private Runnable getTask() {
        //标识当前线程是否超时未能获取到task对象
        boolean timedOut = false;

        for (;;) {
            //获取线程池的控制状态
            int c = ctl.get();
            //获取线程池的运行状态
            int rs = runStateOf(c);

            //如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递减,方法返回null,回收线程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            //获取worker数量
            int wc = workerCountOf(c);

            //标识当前线程在空闲时,是否应该超时回收
            // 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
            // 	如果设置了allowCoreThreadTimeOut为ture,核心worker也会超时,或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了
            //其实就是从队列获取任务的时候要不要设置超时间时间,如果超过这个时间队列还没有任务进来,就会返回null
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize(),导致worker数量大于maximumPoolSize)
            if ((wc > maximumPoolSize || (timed && timedOut))  //或者获取任务超时
                && (wc > 1 || workQueue.isEmpty())) {  //workerCount大于1或者阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个工作线程)
                  //通过cas来设置WorkerCount,如果多个线程竞争,只有一个可以设置成功
                if (compareAndDecrementWorkerCount(c))
                    //线程池工作线程数量递减成功,方法返回null,回收线程
                    return null;
                //线程池工作线程数量递减失败,跳过剩余部分,继续循环
                continue;
            }

            try {
             /**
             * poll:指定时间内获取队列头部元素并移除,超时返回null
             * take:获取队列头部元素并移除,如果队列为空则阻塞当前线程知道队列不为空后返回元素
             * 所以 take == true 表示允许核心线程超时或者此线程非核心线程,都需要进行超时处理
             */
                //如果允许超时回收,则调用阻塞队列的poll(),只在keepAliveTime时间内等待获取任务,一旦超过则返回null
                //否则调用take(),如果队列为空,线程进入阻塞状态,无限时等待任务,直到队列中有可取任务或者响应中断信号退出
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //若task不为null,则返回成功获取的task对象
                if (r != null)
                    return r;
                // 若返回task为null,表示线程空闲时间超时,则设置timeOut为true
                timedOut = true;
            } catch (InterruptedException retry) {
                //如果此worker发生了中断,采取的方案是重试,没有超时
                //在哪些情况下会发生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow()
                timedOut = false;
            }
        }
    }

接下来总结一下getTask()方法会在哪些情况下返回

  1. 线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象
  2. 线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象
  3. 线程池状态大于等于STOP返回null,回收线程
  4. 线程池处于SHUTDOWN状态,并且阻塞队列为空返回null,回收线程
  5. worker数量大于maximumPoolSize返回null,回收线程
  6. 线程空闲时间超时返回null,回收线程

processWorkerExit()的实现

processWorkerExit()方法负责执行结束线程的一些清理工作,下面分析一下其实现。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果用户任务执行过程中发生了异常,则需要递减workerCount
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        //获取全局锁
        mainLock.lock();
        try {
            //将worker完成任务的数量累加到总的完成任务数中
            completedTaskCount += w.completedTasks;
            //从workers集合中移除该worker
            workers.remove(w);
        } finally {
            //释放锁
            mainLock.unlock();
        }
        //尝试终止线程池
        tryTerminate();
        //获取线程池控制状态
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {  //线程池运行状态小于STOP
            if (!completedAbruptly) {  //如果用户任务执行过程中发生了异常,则直接调用addWorker()方法创建线程
                //是否允许核心线程超时
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //允许核心超时并且workQueue阻塞队列不为空,那线程池中至少有一个工作线程
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //如果工作线程数量workerCount大于等于核心池大小corePoolSize,
                //或者允许核心超时并且workQueue阻塞队列不为空时,线程池中至少有一个工作线程,直接返回
                if (workerCountOf(c) >= min)
                    return;
                //若不满足上述条件,则调用addWorker()方法创建线程
            }
            //创建新的线程取代当前线程
            addWorker(null, false);
        }
    }

processWorkerExit()方法中主要调用了tryTerminate()方法,下面看一下tryTerminate()方法的实现。

tryTerminate()的实现

final void tryTerminate() {
        for (;;) {
            //获取线程池控制状态
            int c = ctl.get();
            if (isRunning(c) ||    //线程池的运行状态为RUNNING
                runStateAtLeast(c, TIDYING) ||    //线程池的运行状态大于等于TIDYING
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  //线程池的运行状态为SHUTDOWN且阻塞队列不为空
                //不能终止,直接返回
                return;

            //只有当线程池的运行状态为STOP,或线程池运行状态为SHUTDOWN且阻塞队列为空时,可以执行到这里
            //如果线程池工作线程的数量不为0
            if (workerCountOf(c) != 0) {
                //仅仅中断一个空闲的worker
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            //只有当线程池工作线程的数量为0时可以执行到这里
            final ReentrantLock mainLock = this.mainLock;
            //获取全局锁
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {  //CAS操作设置线程池运行状态为TIDYING,工作线程数量为0
                    try {
                        //执行terminated()钩子方法
                        terminated();
                    } finally {
                        //设置线程池运行状态为TERMINATED,工作线程数量为0
                        ctl.set(ctlOf(TERMINATED, 0));
                        //唤醒在termination条件上等待的所有线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //若CAS操作失败则重试
        }
    }

tryTerminate()方法的作用是 : 尝试终止线程池,它会在所有可能终止线程池的地方被调用,满足终止线程池的条件有两个:首先,线程池状态为STOP,或者为SHUTDOWN且任务缓存队列为空,其次,工作线程数量为0

  • 满足了上述2个条件之后,tryTerminate()方法获取全局锁设置线程池运行状态为TIDYING,之后执行terminated()钩子方法,最后设置线程池状态为TERMINATED

至此,线程池运行状态变为TERMINATED工作线程数量为0workers已清空,且workQueue也已清空所有线程都执行结束线程池的生命周期到此结束


5.关闭线程池

关闭线程池有两个方法,shutdown()和shutdownNow(),下面分别看一下这两个方法的实现。

1.shutdown()的实现

  • shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //获取全局锁
        mainLock.lock();
        try {
            //检查shutdown权限
            checkShutdownAccess();
            //设置线程池运行状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中断所有空闲worker
            interruptIdleWorkers();
            //用onShutdown()钩子方法
            onShutdown();
        } finally {
            //释放锁
            mainLock.unlock();
        }
        //尝试终止线程池
        tryTerminate();
    }

shutdown()方法 首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池

shutdown()方法调用了interruptIdleWorkers()方法中断所有空闲的worker,其实现如下。

	private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    //onlyOne标识是否只中断一个线程
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        //获取全局锁
        mainLock.lock();
        try {
            //遍历workers集合
            for (Worker w : workers) {
                //worker对应的线程
                Thread t = w.thread;
                //线程未被中断且成功获得锁
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //发出中断请求
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        //释放锁
                        w.unlock();
                    }
                }
                //若只中断一个线程,则跳出循环
                if (onlyOne)
                    break;
            }
        } finally {
            //释放锁
            mainLock.unlock();
        }
    }

2.shutdownNow()的实现

  • shutdownNow()方法将线程池运行状态设置为STOP,此时线程池不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        //获取全局锁
        mainLock.lock();
        try {
            //检查shutdown权限
            checkShutdownAccess();
            //设置线程池运行状态为STOP
            advanceRunState(STOP);
            //中断所有worker
            interruptWorkers();
            //将任务缓存队列中等待执行的任务取出并放到list中
            tasks = drainQueue();
        } finally {
            //释放锁
            mainLock.unlock();
        }
        //尝试终止线程池
        tryTerminate();
        //返回任务缓存队列中等待执行的任务列表
        return tasks;
    }

shutdownNow()方法与shutdown()方法相似,不同之处在于:shutdownNow()设置线程池的运行状态为STOP,之后中断所有的worker(并非只是空闲的worker),尝试终止线程池之后,返回任务缓存队列中等待执行的任务列表

shutdownNow()方法调用了 interruptWorkers() 方法中断所有的worker(并非只是空闲的worker),其实现如下。

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        //获取全局锁
        mainLock.lock();
        try {
            //遍历workers集合
            for (Worker w : workers)
                //调用Worker类的interruptIfStarted()方法中断线程
                w.interruptIfStarted();
        } finally {
            //释放锁
            mainLock.unlock();
        }
    }

六.总结

至此,我们已经阅读了线程池框架的核心类ThreadPoolExecutor类的大部分源码,由衷地赞叹这个类很多地方设计的巧妙之处:

  • 线程池用不可重复的HashSet集合保存了当前工作线程worker,工作线程worker被销毁时会从集合中移除该线程对象。
  • 使用一个Integer类型的原子变量,将线程池的运行状态工作线程数量打包在一起,并使用了大量的位运算
  • 使用CAS操作更新线程控制状态ctl,确保对ctl的更新原子操作
  • 通过线程池状态来控制任务的执行,每个Worker线程可以处理多个任务。线程池通过线程的复用减少了线程的创建和销毁带来的开销。
  • 内部类Worker类继承了AQS,实现了一个自定义的同步器,实现了不可重入锁
  • 使用while循环自旋地从任务缓存队列获取任务并执行,实现了线程复用机制
  • 调用interrupt()方法中断线程,但注意该方法并不能直接中断线程的运行,只是发出了中断信号,配合BlockingQueue的take()poll()方法的使用,打断线程的阻塞状态
  • 通过继承ThreadPoolExecutor,并重写父类的beforeExecutoe方法afterExecutoe方法,在线程执行任务的前后添加别的操作

其实,线程池的本质就是生产者消费者模式线程池的调用者不断向线程池提交任务,线程池里面的工作线程不断获取这些任务并执行(从任务缓存队列获取任务或者直接执行任务)。

七.相关好文


异常捕获

拒绝策略

为什么阿里巴巴要禁用Executors创建线程池?

SpringBoot如何优雅的使用、监控线程池

相关好博客


版权声明:本文为qq877728715原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。