Java并发编程系列学习_Timer定时器&ScheduledThreadPoolExecutor任务调度器

Timer定时器  

       在并发编程中,有需要去定时执行某任务、延迟执行某任务或者周期性地执行某任务。最基本的手段是使用JDK中的Timer类,负责计划任务的功能,也就是在指定的时间开始执行某一个任务。但是Timer现在也不建议使用,我们在本文中主要去学习下它如何使用,实际在项目开发中,还是建议使用ScheduledThreadPoolExecutor任务调度器,在编码测试时,也发现了IDEA对于这方面的告警提示,多线程并行地处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其他任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

                                     

先看一段代码,看下基本使用,下面这段程序实现的就是在后面的某一个时刻,执行一个任务,去打印当前时间,然后以1s为间隔周期进行周期性大打印:

/**
 * @Author: jiaqing.xu@hand-china.com
 * @Date: 2019-03-27 17:23
 * @Description
 */
public class Run1 {

    /**
     * 时间
     * 构造参数:true 代表后台进程
     */
    private static Timer timer = new Timer();

    /**
     * TimerTask子类
     */
    static public class MyTask extends TimerTask {
        @Override
        public void run() {
            System.out.println("时间为:" + new Date());
        }
    }

    public static void main(String[] args) {
        System.out.println("开始等到执行...");
        //task子类
        MyTask myTask = new MyTask();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date executeDate = null;
        try {
            executeDate = sdf.parse("2019-03-28 10:09:00");
        } catch (ParseException e) {
            e.printStackTrace();
        }
        //交给timer进行调度
        //第三个参数代表 周期调度
        timer.schedule(myTask,executeDate,1000);
    }
}

                                              

如何取消执行的任务?

TimerTask类的cancel方法:是将自身从任务队列中移除,但是其他的任务不受影响,下面这段程序演示了当运行次数达到了3次,则取消该任务的执行;

Timer类的cancel方法:是将任务队列中的全部的任务清空;

/**
 * @Author: jiaqing.xu@hand-china.com
 * @Date: 2019-03-27 18:11
 * @Description
 */
public class Run1 {
    private static Timer timer = new Timer();
    private static int runCount = 0;
    static public class MyTask extends TimerTask{
        @Override
        public void run() {
            System.out.println("1 begin 运行了! 时间为:"+new Date());
            //线程休息1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1 end 结束了! 时间为:"+new Date());
            runCount++;
            if(runCount==3){
                timer.cancel();
            }
        }
    }
    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date executeDate = null;
        try {
            executeDate = sdf.parse("2019-03-27 18:22:00");
        } catch (ParseException e) {
            e.printStackTrace();
        }
        //交给timer进行调度
        //第三个参数代表 周期调度
        timer.schedule(myTask,executeDate,1000);
    }
}

ScheduledThreadPoolExecutor任务调度器

ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务,相对于任务调度的Timer来说,其功能更加强大,Timer只能使用一个后台线程执行任务,而ScheduledThreadPoolExecutor则可以通过构造函数来指定后台线程的个数。

ScheduledThreadPoolExecutor类的UML图如下:

                                               

  1. 从UML图可以看出,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,也就是说ScheduledThreadPoolExecutor拥有execute()和submit()提交异步任务的基础功能,但是,ScheduledThreadPoolExecutor类实现了ScheduledExecutorService,该接口定义了ScheduledThreadPoolExecutor能够延时执行任务和周期执行任务的功能;

  2. ScheduledThreadPoolExecutor也两个重要的内部类:DelayedWorkQueueScheduledFutureTask。可以看出DelayedWorkQueue实现了BlockingQueue接口,也就是一个阻塞队列,ScheduledFutureTask则是继承了FutureTask类,也表示该类用于返回异步任务的结果。

构造方法

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
};

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
};

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

可以看出由于ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,它的构造方法实际上是调用了ThreadPoolExecutor,理解ThreadPoolExecutor构造方法的几个参数的意义后,理解这就很容易了。可以看出,ScheduledThreadPoolExecutor的核心线程池的线程个数为指定的corePoolSize,当核心线程池的线程个数达到corePoolSize后,就会将任务提交给有界阻塞队列DelayedWorkQueue,对DelayedWorkQueue在下面进行详细介绍,线程池允许最大的线程个数为Integer.MAX_VALUE,也就是说理论上这是一个大小无界的线程池。

特有方法

ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,该接口定义了可延时执行异步任务和可周期执行异步任务的特有功能,相应的方法分别为:

//达到给定的延时时间后,执行任务。这里传入的是实现Runnable接口的任务,
//因此通过ScheduledFuture.get()获取结果为null
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//达到给定的延时时间后,执行任务。这里传入的是实现Callable接口的任务,
//因此,返回的是任务的最终计算结果
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

//是以上一个任务开始的时间计时,period时间过去后,
//检测上一个任务是否执行完毕,如果上一个任务执行完毕,
//则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
//当达到延时时间initialDelay后,任务开始执行。上一个任务执行结束后到下一次
//任务执行,中间延时时间间隔为delay。以这种方式,周期性执行任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

可周期性地执行任务ScheduledFutureTask

ScheduledThreadPoolExecutor最大的特色是能够周期性执行异步任务,当调用schedule,scheduleAtFixedRate和scheduleWithFixedDelay方法时,实际上是将提交的任务转换成的ScheduledFutureTask类,从源码就可以看出。以schedule方法为例:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

可以看出,通过decorateTask会将传入的Runnable转换成ScheduledFutureTask类。线程池最大作用是将任务和线程进行解耦,线程主要是任务的执行者,而任务也就是现在所说的ScheduledFutureTask。紧接着,会想到任何线程执行任务,总会调用run()方法。为了保证ScheduledThreadPoolExecutor能够延时执行任务以及能够周期性执行任务,ScheduledFutureTask重写了run方法:

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
		//如果不是周期性执行任务,则直接调用run方法
        ScheduledFutureTask.super.run();
		//如果是周期性执行任务的话,需要重设下一次执行任务的时间
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

从源码可以很明显的看出,在重写的run方法中会先if (!periodic)判断当前任务是否是周期性任务,如果不是的话就直接调用run()方法;否则的话执行setNextRunTime()方法重设下一次任务执行的时间,并通过reExecutePeriodic(outerTask)方法将下一次待执行的任务放置到DelayedWorkQueue中。

因此,可以得出结论:ScheduledFutureTask最主要的功能是根据当前任务是否具有周期性,对异步任务进行进一步封装。如果不是周期性任务(调用schedule方法)则直接通过run()执行,若是周期性任务,则需要在每一次执行完后,重设下一次执行的时间,然后将下一次任务继续放入到阻塞队列中。

DelayedWorkQueue

在ScheduledThreadPoolExecutor中还有另外的一个重要的类就是DelayedWorkQueue。为了实现其ScheduledThreadPoolExecutor能够延时执行异步任务以及能够周期执行任务,DelayedWorkQueue进行相应的封装。DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。

1、为什么要使用DelayedWorkQueue呢?

定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。

DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。

2、DelayedWorkQueue的数据结构

//初始大小
private static final int INITIAL_CAPACITY = 16;
//DelayedWorkQueue是由一个大小为16的数组组成,数组元素为实现RunnableScheduleFuture接口的类
//实际上为ScheduledFutureTask
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
关于DelayedWorkQueue我们可以得出这样的结论:DelayedWorkQueue是基于堆的数据结构,按照时间顺序将每个任务进行排序,将待执行时间越近的任务放在在队列的队头位置,以便于最先进行执行

总结

  • ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,因此,整体上功能一致,线程池主要负责创建线程(Worker类),线程从阻塞队列中不断获取新的异步任务,直到阻塞队列中已经没有了异步任务为止。但是相较于ThreadPoolExecutor来说,ScheduledThreadPoolExecutor具有延时执行任务和可周期性执行任务的特性,ScheduledThreadPoolExecutor重新设计了任务类ScheduleFutureTask,ScheduleFutureTask重写了run方法使其具有可延时执行和可周期性执行任务的特性。另外,阻塞队列DelayedWorkQueue是可根据优先级排序的队列,采用了堆的底层数据结构,使得与当前时间相比,待执行时间越靠近的任务放置队头,以便线程能够获取到任务进行执行;
  • 线程池无论是ThreadPoolExecutor还是ScheduledThreadPoolExecutor,在设计时的三个关键要素是:任务,执行者以及任务结果。它们的设计思想也是完全将这三个关键要素进行了解耦。

执行者

  • 任务的执行机制,完全交由Worker类,也就是进一步了封装了Thread。向线程池提交任务,无论为ThreadPoolExecutor的execute方法和submit方法,还是ScheduledThreadPoolExecutor的schedule方法,都是先将任务移入到阻塞队列中,然后通过addWork方法新建了Work类,并通过runWorker方法启动线程,并不断的从阻塞对列中获取异步任务执行交给Worker执行,直至阻塞队列中无法取到任务为止。

任务

  • 在ThreadPoolExecutor和ScheduledThreadPoolExecutor中任务是指实现了Runnable接口和Callable接口的实现类。ThreadPoolExecutor中会将任务转换成FutureTask类,而在ScheduledThreadPoolExecutor中为了实现可延时执行任务和周期性执行任务的特性,任务会被转换成ScheduledFutureTask类,该类继承了FutureTask,并重写了run方法。

任务结果

  • 在ThreadPoolExecutor中提交任务后,获取任务结果可以通过Future接口的类,在ThreadPoolExecutor中实际上为FutureTask类,而在ScheduledThreadPoolExecutor中则是ScheduledFutureTask

版权声明:本文为jiaqingShareing原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。