本文主要介绍了可定时线程池的核心原理,从宏观角度大概分析了线程池工作方式,如有不足,请指出,谢谢。
1.什么是线程池
线程池顾名思义是一个线程缓存的‘池子’。线程是稀缺资源,线程如果创建的太多,会消耗系统的资源,还会降低系统的稳定性,所以java中通过线程池来统一管理分配线程这个稀缺的资源,达到资源重复利用。
2.线程池的出现
在web系统中,服务器需要接受大量的并发请求,一个请求就会对应一个线程,如果并发的请求很多,但每个线程执行的时间很短,这样系统就会频繁的创建和销毁线程,系统的性能就会受到影响。那么是否存在一种方式,线程执行任务后并不销毁,而是重复利用?这就是线程池出现的目的。
3.ScheduledThreadPoolExecutor
- 之前的一篇文章分析了线程池ThreadPoolExecutor的工作原理传送门,这篇文章主要介绍可定时任务的线程ScheduledThreadPoolExecutor,它也是线程池的一种,从下面类图可以看出,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ScheduledThreadPoolExecutor的大部分逻辑在ThreadPoolExecutor线程池中已经实现,如果不太清楚ThreadPoolExecutor,可以参考上一篇文章的讲解了,详细信息可查看。
4.主要提交任务的方法
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以具备线程池提交任务的一般方法[submit()与execute()],同时还具备提交定时任务的几个方法,如下所示:
//延后delay时长执行Runnable任务
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
//延后delay时长执行Callable任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
//延后执行Runnable 任务,以后每隔period的时长再次执行该任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
//延后执行Runnable 任务,以后任务执行完成后等待delay时长,再次执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
- 从上面几个方法可以看出,都执行了delayedExecute(t)方法,先分析下该方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池关闭就拒绝任务
if (isShutdown())
reject(task);
else {
// 添加任务到延迟队列中,而ThreadPoolExecute线程池是线程
// 数大于核心线程时才添加任务到阻塞队列,这里是第一次添加
// 任务,因为是定时线程池,在延迟队列中的任务执行run方法时
// 会将任务继续添加进延迟队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 这里是增加一个worker线程,避免提交的任务没有worker去执行
ensurePrestart();
}
}
// 该方法是ThreadPoolExecute线程池里面的方法
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//这里就和ThreadPoolExecute线程池里的方法一样,创建一个没有任务的线程
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
5.ScheduledFutureTask
- ScheduledThreadPoolExecutor线程池中提交的任务都会被封装成ScheduledFutureTask类型的任务,ScheduledFutureTask是ScheduledThreadPoolExecutor中的一个内部类,实现了Runnable接口,也就是说,ScheduledFutureTask是一个线程任务,接下来我们简单分析下ScheduledFutureTask。
- 构造方法如下
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;//任务开始的时间
this.period = 0;//任务执行的时间间隔
this.sequenceNumber = sequencer.getAndIncrement();//任务的序号
}
- 线程池中,需要用阻塞队列保存等待被执行的任务,ScheduledThreadPoolExecutor线程池中通过DelayQueue阻塞队列来存储等待的任务。DelayQueue接收SchduledFutureTask类型的任务。DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体算法如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
- DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若
time相同则根据sequenceNumber排序;- DelayQueue也是一个无界队列,底层数据结构为最小堆,用数组存储,初始容量16;
- 由上面分析可知ScheduledFutureTask任务是线程池执行的具体任务,我们来看一下它的run方法:
public void run() {
// 是否周期性执行
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//如果不需要周期性执行,则直接执行run方法然后结束
ScheduledFutureTask.super.run();
//ScheduledFutureTask.super.runAndReset()就是执行自己提交的任务的run方法
else if (ScheduledFutureTask.super.runAndReset()) {
// 计算下次执行任务的时间
setNextRunTime();
// 重复执行任务
reExecutePeriodic(outerTask);
}
}
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // 这里设置执行自己任务的方法
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 这里是添加任务到队列
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 该方法是ThreadPoolExecute线程池里面的方法
ensurePrestart();
}
}
版权声明:本文为qq_37803406原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。