ScheduledThreadPoolExecutor详解
简介
- 继承自ThreadPooExecutor,为任务提供延迟或周期执行.
- 使用专门的ScheduledFutureTask来执行周期任务,也可以接收不需要时间调度的任务.
- 使用DelayedWorkQueue存储任务.(一种无界延迟队列)
- 支持线程池关闭后可执行,可选择线程池关闭后支持继续执行周期或延迟任务.
解析
内部有两个内部类:
- ScheduledFutureTask:可以延迟执行的异步运算任务.
- DelayedWorkQueue:存储周期或延迟任务的延迟队列.
ScheduledFutureTask
一、任务的执行
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
流程:
- 若当前状态下不可执行,则取消任务.
- 若可以执行,则检查是否为周期任务.若为非周期任务,则直接执行.
- 若为周期任务,则执行后设置下一次执行的时间,并将任务加入周期队列中.
二、任务的取消
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
流程:
- 取消任务
- 根据参数决定是否从队列中移除此任务.
核心方法
schedule
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
流程:
- 若任务为空,则抛出异常.
- 检查线程池是否关闭.若关闭,则拒绝任务.
- 若没有关闭,则将任务加入到等待队列.
- 再次检查线程池是否在运行.若线程池在运行或者允许线程池关闭运行,则启动新线程等待执行任务.
- 若线程池已经关闭并且不允许线程池关闭后运行,则从队列中移出指定的任务,再取消任务.
scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
特点: 创建一个周期执行的任务,第一次执行延期时间为initialDelay之后每隔period执行一次,不等待第一次执行完就开始计时.
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
特点:创建一个周期执行的任务,第一次执行延期时间为initialDelay,在第一次执行完之后延迟delay后开始下一次执行.
shutdown()
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}
解释:
判断是否有线程池关闭后保留的任务.
1、若没有保留的任务,则依次取消任务,并清除队列.
2、若有保留的任务,则对于非周期性任务,取消该任务并将其清除出队列.
业务实现代码块
public void cancel(ApplicationInfo applicationInfo) {
String id = applicationInfo.getId();
log.info("{}用户开始存储定时任务", id);
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(3);
scheduled.schedule(new Runnable() {
@Override
public void run() {
log.info("{}用户开始触发定时任务", id);
ApplicationInfo applicationInfo1 = applicationInfoService.getById(id);
String applicationState = applicationInfo1.getApplicationState();
//当48小时候 状态仍为待支付 自动取消改变数据库状态
if (applicationState.equals("2")) {
//自动取消 修改数据库
applicationInfo1.setApplicationState("9");
applicationInfoService.updateById(applicationInfo1);
log.info("{}用户已超过48小时未支付,自动取消", id);
}
}
}, 2, TimeUnit.DAYS);
}
实现的是一个超时两天未支付自动取消的功能。
版权声明:本文为weixin_43942106原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。