ScheduledThreadPoolExecutor(定时任务线程池)简单理解

本文主要介绍了可定时线程池的核心原理,从宏观角度大概分析了线程池工作方式,如有不足,请指出,谢谢。

1.什么是线程池

线程池顾名思义是一个线程缓存的‘池子’。线程是稀缺资源,线程如果创建的太多,会消耗系统的资源,还会降低系统的稳定性,所以java中通过线程池来统一管理分配线程这个稀缺的资源,达到资源重复利用。

2.线程池的出现

在web系统中,服务器需要接受大量的并发请求,一个请求就会对应一个线程,如果并发的请求很多,但每个线程执行的时间很短,这样系统就会频繁的创建和销毁线程,系统的性能就会受到影响。那么是否存在一种方式,线程执行任务后并不销毁,而是重复利用?这就是线程池出现的目的。

3.ScheduledThreadPoolExecutor

  • 之前的一篇文章分析了线程池ThreadPoolExecutor的工作原理传送门,这篇文章主要介绍可定时任务的线程ScheduledThreadPoolExecutor,它也是线程池的一种,从下面类图可以看出,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ScheduledThreadPoolExecutor的大部分逻辑在ThreadPoolExecutor线程池中已经实现,如果不太清楚ThreadPoolExecutor,可以参考上一篇文章的讲解了,详细信息可查看。

在这里插入图片描述

4.主要提交任务的方法

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以具备线程池提交任务的一般方法[submit()与execute()],同时还具备提交定时任务的几个方法,如下所示:

//延后delay时长执行Runnable任务
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

//延后delay时长执行Callable任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

 //延后执行Runnable 任务,以后每隔period的时长再次执行该任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

//延后执行Runnable 任务,以后任务执行完成后等待delay时长,再次执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  • 从上面几个方法可以看出,都执行了delayedExecute(t)方法,先分析下该方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池关闭就拒绝任务
    if (isShutdown())
        reject(task);
    else {
    // 添加任务到延迟队列中,而ThreadPoolExecute线程池是线程
       // 数大于核心线程时才添加任务到阻塞队列,这里是第一次添加
       // 任务,因为是定时线程池,在延迟队列中的任务执行run方法时
       // 会将任务继续添加进延迟队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
       // 这里是增加一个worker线程,避免提交的任务没有worker去执行
            ensurePrestart();
    }
}
// 该方法是ThreadPoolExecute线程池里面的方法
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        //这里就和ThreadPoolExecute线程池里的方法一样,创建一个没有任务的线程
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

5.ScheduledFutureTask

  • ScheduledThreadPoolExecutor线程池中提交的任务都会被封装成ScheduledFutureTask类型的任务,ScheduledFutureTask是ScheduledThreadPoolExecutor中的一个内部类,实现了Runnable接口,也就是说,ScheduledFutureTask是一个线程任务,接下来我们简单分析下ScheduledFutureTask。
  • 构造方法如下
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;//任务开始的时间
    this.period = 0;//任务执行的时间间隔
    this.sequenceNumber = sequencer.getAndIncrement();//任务的序号
}
  • 线程池中,需要用阻塞队列保存等待被执行的任务,ScheduledThreadPoolExecutor线程池中通过DelayQueue阻塞队列来存储等待的任务。DelayQueue接收SchduledFutureTask类型的任务。DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体算法如下:
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
  1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若
    time相同则根据sequenceNumber排序;
  2. DelayQueue也是一个无界队列,底层数据结构为最小堆,用数组存储,初始容量16;
  • 由上面分析可知ScheduledFutureTask任务是线程池执行的具体任务,我们来看一下它的run方法:
public void run() {
    // 是否周期性执行
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
     //如果不需要周期性执行,则直接执行run方法然后结束
        ScheduledFutureTask.super.run();
    //ScheduledFutureTask.super.runAndReset()就是执行自己提交的任务的run方法
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 计算下次执行任务的时间
        setNextRunTime();
     // 重复执行任务
        reExecutePeriodic(outerTask);
    }
}

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // 这里设置执行自己任务的方法
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}


void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        // 这里是添加任务到队列
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else     
         // 该方法是ThreadPoolExecute线程池里面的方法
            ensurePrestart();
    }
}

版权声明:本文为qq_37803406原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。