java8之中ThreadPoolExecutor详解

本文写一下java创建线程的一个核心类ThreadPoolExecutor的基本信息

在java的几种默认线程池之中,除了java8新增的任务窃取线程池newWorkStealingPool的实现不一样外,其他的几种无非都是对于ThreadPoolExecutor进行不同的初始化来实现的。

先看下ThreadPoolExecutor之中的构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
         //此处方法的具体内容省略  
    }

可以看到总共有七个参数,其中意思分别为:

corePoolSize : 核心线程数

maximumPoolSize:最大线程数

keepAliveTime:当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间

unit:存活时间keepAliveTime的单位

workQueue:用于在任务执行前存放任务的队列

threadFactory:创建新线程时要使用的工厂

handler:线程数和任务队列都满了以后,执行的拒绝策略

以上就是该类构造方法的七个参数。

对于任务队列workQueue:常见的有三种

1、SynchronousQueue:Direct handoffs(直接交换) 不以任何方式持有任务,直接提交给线程。该种队列通常需要需要无限的MaximumPoolSize,以避免拒绝新提交的任务。

比如已经实现了的newCachedThreadPool()线程池。

2、LinkedBlockingQueue:Unbounded queues(基于链表结构的无界队列)使用该队列的时候,当线程的核心线程数corePoolSize一直在繁忙的时候,可能导致新任务一直在任务队列之中进行等待。因此,就不会有大于corePoolSize的线程个数被创建,也就是说最大线程数maximumPoolSize的设置就是无效的。

比如已经实现的固定长度线程池newFixedThreadPool就是此种实现方式。

3、ArrayBlockingQueue:Bounded queues(基于数组结构的有界队列)与固定的最大线程数maximumPoolSizes一起使用时有助于防止资源耗尽,但可能更难优化和控制。队列的大小和最大线程数的大小可能会相互制衡。使用大的队列和小的核心线程数的时候,可以最大程度的减少cpu的使用、操作系统的资源和上下文切换的资源开销,但是也可能会人为的导致吞吐量的降低。使用小队列的话就需要更大的池。这会使得cpu更加的繁忙。可能会导致不可接受的调度开销,也会降低吞吐量。

对于拒绝策略handler,默认的有四种:

1、AbortPolicy:默认的策略,直接抛出RejectedExecutionException异常。源码如下所示:

public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

2、CallerRunsPolicy:调用自身线程来运行该任务。提供了一种简单的反馈控制机制,可以降低新任务的提交频率。源码如下所示:

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

3、DiscardPolicy:直接丢弃。源码如下:

public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

4、DiscardOldestPolicy:丢弃队列之中最老的一个,就是即将被执行的任务。然后重试提交执行当前任务。(可能会再次失败导致重复执行)。源码如下:

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

备注: 也可以自己实现RejectedExecutionHandler接口,重写其中的rejectedExecution方法来自定义响应的拒绝策略。

大致的执行流程为当有新的任务进来,线程数还未达到核心线程数corePoolSize 的时候,直接创建新的线程来执行任务。当达到核心线程数以后,首先会往任务队列workQueue里面存放任务。当任务队列满了以后,在还未达到最大线程数maximumPoolSize的时候,就会继续创建新的线程来执行任务。在达到最大线程数以后,如果还继续有新的任务进来,就会执行对应的拒绝策略handler。

先看一下基本的使用方式:

public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3,
                5,
                60,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        // 线程数
        int threads = 10;
        // 用于计数线程是否执行完成
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        System.out.println("---- start ----");
        for (int i = 0; i < threads; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName());
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("---- end ----");
    }

执行结果为:

然后我们深入源码看一下具体的execute执行过程

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 如果运行的线程少于corePoolSize,则尝试启动一个新线程并作为其第一个任务。
         * 它调用addWorker的原子方式检查runState 和workerCount,
         * 通过返回false,从而防止在不应该的情况下可能添加的错误警报,
         * 
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 如果一个任务被添加到队列之中,我们仍然需要双重验证是否需要添加一个线程。
         * (因为自从上次检查以后,是否有线程已经死亡)或者自从进入此方法以后线程池是否关闭
         * 所以需要重新检查,必要的时候在停止时回滚排队,或者没有工作线程的时候启动新线程,
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 若我们不能添加任务,那我们将尝试添加一个新线程。
         * 如果失败了,我们知道我们已经线程池被关闭或饱和,因此拒绝该任务
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以看到上述方法之中的核心方法是addWorker。

下面看一下addWorker的源码:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 检查线程是否是>=SHUTDOWN 的状态,也就是RUNNING的状态的时候,就直接跳过,
            // 线程为大于SHUTDOWN的状态的时候,直接return false
            // 线程是SHUTDOWN的状态,并且firstTask不等于null的时候直接拒绝。
            // 检查队列是否为空,队列为空的时候,不需要增加线程,返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 判断线程池是否已满。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作线程数
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建新的线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //开始执行
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

线程创建完成以后,就是执行Worker的runWorker方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 如果线程状态是大于stop状态,则中断次线程
                // 主要是确保当前线程不是stop状态,和当前线程没有被中断,
                // 如果是的话,则中断次线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //前置钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                       //开始执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       //后置钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

本品文章先写到这里,如有问题,请各位大佬多多指教。


版权声明:本文为duanNFF原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。