【Java】线程池(五)shutdown shutdownnow

以ExecutorService的实现类ThreadPoolExecutor为例,看下关于线程池管理方法的实原理:

注意shutdown shutdownnow方法均不是阻塞的,仅仅完成状态的设置,不会等待任务执行完毕。

1.shutdown:调用该方法后会拒绝接收新任务。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

调用advanceRunState方法先设置线程池的状态为SHUTDOWN,实现方式为cas自旋,该状态的线程池将不会接收新的任务。为什么不会接收?

Worker是一个Ruannble接口的实现类,其run方法中,有一处调用是不断从任务队列中取任务,每一次poll,都会先检测一遍当前线程池的状态:

            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

如果是SHUTDOWN并且任务队列为空,则直接返回null,表示不再有新的任务了,worker线程会退出。

设置完状态后,接着调用interruptIdleWorkers方法,中断那些空闲worker线程:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

其实就是遍历当前工作线程,并依次调用其interrpt方法。为什么需要这步?

因为Worker的run方法里,会不断从任务队列里poll任务,这是一个阻塞操作,如果不打断阻塞poll,则有的空闲的worker会一直等在那里,导致回收不及时。一旦打断阻塞,会结束当前循环,下一次再开始循环时,就可以检测到线程池状态已经变为了SHUTDOWN,所以就可以直接返回了。下面是getTask源码:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

最后一步是调用tryTerminate方法,尝试将线程池状态转换为TERMINATED,该状态才是线程池真正终止的标志:

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

该方法也是在一个for循环内部,通过不断尝试来terminate线程池。该方法会在多处被调用,这里看shutdown的情况。

如果当前状态是SHUTDOWN并且workQueue不为空,那么第一层if就直接return了,意味着当前时刻还无法terminate,因为workQueue里还有未执行完成的任务。否则如果当前状态是SHUTDOWN并且workQueue为空了,那么可以通过第一层if判断,这意味着线程池的等待队列里已经没有需要被处理的任务了,只需要等待正在被处理的任务结束即可。所以第二个if就是在判断正在执行任务的worker数目是不是0,如果不是0,就尝试执行interruptIdleWorkers方法,因为一旦执行完了,就可以通过该方法来停止工作线程的运行,完成shutdown。一旦worker数目为0了,就会进入到真正的TERMINATED状态的设置了,在terminate方法调用前,先设置为TIDYING状态,terminate方法调用后,再设置为TERMINATED状态。

为什么要有TIDYING状态而不是直接TERMINATED状态?

因为只要进入到TIDYING状态,说明工作线程数为0并且workQueue为0(如果状态是SHUTDOWN,那么有一个判断可以保证workQueue为0,如果状态是STOP,那么会在shutdownnow方法内清空workQueue),这可以定义为一个标志性的状态,即线程池为空,所以TIDYING状态就是为了表征这个case的。

与TERMINATED的区别是,TERMINATED状态在TIDYING基础上增加了terminate方法的调用。该状态是线程池关闭的真正状态。

总结shutdown方法做了什么事情:

1.设置线程池状态为SHUTDOWN,这将导致线程池不会接收新任务;

2.中断idle的worker线程,回收;

3.尝试terminate;

 

2.shutdownnow:调用该方法后会停止接收新的任务,并且停止当前正在执行的任务,清空workQueue。

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

这里会先调用advanceRunState方法设置线程池状态为STOP。然后调用interruptWorkers方法中断所有worker线程,实现为依次调用interrupt方法打断worker线程:

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

然后再调用drainQueue方法,将workQueue里等待执行的任务清空并导入到结果集tasks队列里:

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

最后调用tryTerminate方法,尝试将线程池状态转换为TERMINATED。

对应到shutdownnow的情况,线程池状态会是STOP,所以第一个if可以通过,第二个if如果所有任务都有对中断信号的响应处理,那么理论上来说是可以退出的,workQueue数目应该是0了,如果不是,那么也会return,认为无法中断线程池。最后设置TERMINATED状态和之前的逻辑一致。

最后返回workQueue里的任务;

总结shutdownnow做了哪些事情:

1.设置线程池状态为STOP,拒绝接收新任务;

2.打断所有worker线程,回收;

3.清空并返回workQueue里的任务;

4.尝试terminate;

 

3.waitTermination

最后看一个阻塞方法,当调用shutdown方法后,可以再调用该waitTermination方法来等待线程池的退出:

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

循环检测线程池状态是否是TERMINATED。


版权声明:本文为u010900754原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。