前些天看了Elastic-Job的实现原理,它是基于Quartz实现的,Quartz提供了定时任务的功能,现在我们就来看看Quartz底册是怎么实现的。初始化部分我们就不关注了,直接进入核心部分:
scheduler.start();
启动定时器
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
首先检查了定时器的状态,如果当前是shuttingDown或者closed状态,那么就抛出异常,然后唤醒定时器里面的监听器执行他们的定时器正在启动的监听方法,如果是第一次启动,那么初始化initialStart和执行schedulerStarted()方法,这个方法是一个空方法什么也没做,接着启动插件,如果不是第一次启动,那么执行schedulerResumed()方法,这个方法也是一个空方法。下面给schedThread的一个字段设置为false,最后执行监听器的启动完毕的方法。
我们发现在start方法中有一个线程schedThread,那么我们就先来看一下它是如何被创建的
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
this.qs = qs;
this.qsRsrcs = qsRsrcs;
this.setDaemon(setDaemon);
if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
this.setPriority(threadPrio);
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);
}
首先保存了定时器的一些信息,设置是否是守护线程,设置优先级,最后初始化了两个标志,都是停止的意思,并且,我们在刚才的start方法里面,发现出现了一次给paused的赋值,这个线程是在声明定时器的时候被启动的,那么,我们就来看一下它的run方法做了什么
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
...
}
}
从宏观上看,他的run方法在执行一个死循环,只要halted为false就会一直执行下去,接下来,我们深入到内部去看一下
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
是不是发现了什么不可思议的事,在这段代码里面出现了paused标志,我们可以发现,如果paused如果为false,这个线程又会陷入这个死循环,所以在start方法里面,将paused字段设置为false的时候,就让线程跳出了这个循环向下执行,也算是真正的启动了这个线程
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
当线程获取的可以执行的定时任务的时候,会将任务放到一个线程池中去执行,在这一步会获取当前线程池空闲的线程的数目,并作为下面获取定时任务数目的参考,先来看一下是如何获取空闲线程的数目的
public int blockForAvailableThreads() {
synchronized(nextRunnableLock) {
while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
return availWorkers.size();
}
}
可以看到,这是一个阻塞的方法,如果当前线程池里面没有空闲的线程,那么阻塞到空闲线程的数目大于等于1或者线程池关闭,否则返回空闲线程的数目
if(availThreadCount > 0) {
...
}
else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
获取完空闲线程的数目后,根据数目的大小进行不同的操作,如果为0,那么重新循环,我们重点关注的是由空闲线程的那部分
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
首先声明了一个集合用来保存获取到的定时任务,下一行是获取当前时间,最后一个方法用来清除标志,这个标志代表:当从定时任务的集合中获取定时任务或者获取完定时任务后有新插入的定时任务,就需要设置一些标志来告诉当前线程有新的定时任务插入进来了,需要根据新插入的定时任务的下次触发时间进行相应的操作
public void clearSignaledSchedulingChange() {
synchronized(sigLock) {
signaled = false;
signaledNextFireTime = 0;
}
}
signaled 用来标志有新插入的定时任务,signaledNextFireTime用来记录新插入的定时任务的下次触发时间
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
这段代码的第一步操作就是获取可以触发的定时任务,如果获取成功那么就将lastAcquireFailed标记为false,否则标记为true。在获取定时任务的时候传入了几个参数,现在还不知道什么意思,我们进入这个方法里面看一下
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {
synchronized (lock) {
List<OperableTrigger> result = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();
long firstAcquiredTriggerFireTime = 0;
// return empty list if store has no triggers.
if (timeTriggers.size() == 0)
return result;
while (true) {
TriggerWrapper tw;
try {
tw = timeTriggers.first();
if (tw == null)
break;
timeTriggers.remove(tw);
} catch (java.util.NoSuchElementException nsee) {
break;
}
if (tw.trigger.getNextFireTime() == null) {
continue;
}
if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
continue;
}
if (tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow) {
timeTriggers.add(tw);
break;
}
// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = tw.trigger.getJobKey();
JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
excludedTriggers.add(tw);
continue; // go to next trigger in store.
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
tw.state = TriggerWrapper.STATE_ACQUIRED;
tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
OperableTrigger trig = (OperableTrigger) tw.trigger.clone();
result.add(trig);
if(firstAcquiredTriggerFireTime == 0)
firstAcquiredTriggerFireTime = tw.trigger.getNextFireTime().getTime();
if (result.size() == maxCount)
break;
}
// If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.
if (excludedTriggers.size() > 0)
timeTriggers.addAll(excludedTriggers);
return result;
}
}
首先对下面的操作进行加锁,接着初始化了三个集合他们的作用分别是:
result:保存获取到的定时任务
acquiredJobKeysForNoConcurrentExec:如果当前任务不允许并发执行,获取到当前任务后,添加到集合中
excludedTriggers:如果当前任务不允许并发执行并且acquiredJobKeysForNoConcurrentExec集合中已经有当前任务了,那么就添加到excludedTriggers集合中
如果没有保存的定时任务那么就直接返回,timeTriggers是一个TreeSet的集合,根据定时任务的下次触发时间进行排序,所起每次从timeTriggers获取到的定时任务都是最早被触发的
下面进入一个死循环中,每次从timeTriggers获取一个定时任务,并且从timeTriggers中移除这个定时任务,如果这个定时任务的下次触发时间为空,那么这个定时任务就不需要执行和保存了直接丢弃。下面这个操作是比较重要的,因为一个定时任务的下次触发时间是在添加到集合之前就被确定的,所以会存在当从集合中获取到这个定时任务的时候,已经过了定时任务的触发时间,我们来看一下它是怎么处理的
protected boolean applyMisfire(TriggerWrapper tw) {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
Date tnft = tw.trigger.getNextFireTime();
if (tnft == null || tnft.getTime() > misfireTime
|| tw.trigger.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {
return false;
}
Calendar cal = null;
if (tw.trigger.getCalendarName() != null) {
cal = retrieveCalendar(tw.trigger.getCalendarName());
}
signaler.notifyTriggerListenersMisfired((OperableTrigger)tw.trigger.clone());
tw.trigger.updateAfterMisfire(cal);
if (tw.trigger.getNextFireTime() == null) {
tw.state = TriggerWrapper.STATE_COMPLETE;
signaler.notifySchedulerListenersFinalized(tw.trigger);
synchronized (lock) {
timeTriggers.remove(tw);
}
} else if (tnft.equals(tw.trigger.getNextFireTime())) {
return false;
}
return true;
}
首先获取当前系统时间,减去可以容忍的延误的时间和定时任务的下次触发时间进行比较,如果定时任务的下次触发时间为空或者下次触发时间大于可以最早执行的时间或者可以无条件容忍延误,就返回false。否则,继续向下执行,触发定时任务过期的监听器的监听方法,更新下次触发时间,如果当前任务的下次过期时间为空或者当前定时任务的状态已经完成,那么就直接从timeTriggers中移除,否则比较更新后的下次触发时间是否和之前的相等,如果相等也返回false。最后返回true
回到上一个方法
if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
continue;
}
当applyMisfire(tw)返回true的时候才进入这个分支,如果定时任务的下次触发时间不为空,那么重新将它放入timeTriggers集合中
if (tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow) {
timeTriggers.add(tw);
break;
}
如果定时任务的下次出发时间大于noLaterThan + timeWindow就跳出循环,现在我们可以知道传入的其中两个参数的作用了
noLaterThan:用来记录本次获取定时任务的起始时间
timeWindow:用来记录定时任务最大的时间间隔
两个结合起来就限制了定时任务的触发时间不能超过他们的和,如果超过了,后面获取的定时任务也肯定超过了,没必要在遍历了直接返回。
JobKey jobKey = tw.trigger.getJobKey();
JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
excludedTriggers.add(tw);
continue; // go to next trigger in store.
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
这部分代码就是刚才在介绍集合的时候说到的意思,就不在赘述了
tw.state = TriggerWrapper.STATE_ACQUIRED;
tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
OperableTrigger trig = (OperableTrigger) tw.trigger.clone();
result.add(trig);
if(firstAcquiredTriggerFireTime == 0)
firstAcquiredTriggerFireTime = tw.trigger.getNextFireTime().getTime();
if (result.size() == maxCount)
break;
循环的最后这部分,先将定时任务的状态修改为获取状态,然后克隆一份放到result集合中,保存获取到的最后一个定时任务的下次触发时间,判断本次获取到的定时任务的个数有没有超过限制,超过了也要跳出循环
if (excludedTriggers.size() > 0)
timeTriggers.addAll(excludedTriggers);
return result;
方法的最后看excludedTriggers集合中有没有元素,有的话再将他们放到timeTriggers中。
再回到线程的run方法中
if (triggers != null && !triggers.isEmpty()) {
...
}
如果获取到了定时任务才进入这个分支
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
先获取当前时间,获取第一个定时任务的出发时间,作差得到距离第一个定时任务触发的时间,如果差值大于2ms就进入循环。
首先判断定时器是否停止了,然后看一下有没有新的比第一个定时任务的下次触发时间早的定时任务添加进来
private boolean isCandidateNewTimeEarlierWithinReason(long oldTime, boolean clearSignal) {
synchronized(sigLock) {
if (!isScheduleChanged())
return false;
boolean earlier = false;
if(getSignaledNextFireTime() == 0)
earlier = true;
else if(getSignaledNextFireTime() < oldTime )
earlier = true;
if(earlier) {
// so the new time is considered earlier, but is it enough earlier?
long diff = oldTime - System.currentTimeMillis();
if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 70L : 7L))
earlier = false;
}
if(clearSignal) {
clearSignaledSchedulingChange();
}
return earlier;
}
}
首先检查是否有新的任务添加进来,主要就是检查signaled标志,如果没有直接返回,否则继续向下执行。用earlier标志是否新添加的定时任务触发时间早,获取新添加的定时任务的下次触发时间,如果为0,earlier设为true,如果小于之前获取到的定时任务的下次触发时间,也将earlier设置为true。然后判断是否足够的早,如果不是修改earlier为false,最后判断是否需要清空这次新添加任务的标志,返回earlier。回到之前的代码
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
如果earlier为false,说明对当前的定时任务集合不需要任何操作,进入分支结构,阻塞时长为时间差
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
循环里面的最后一个分支就是来对新添加的定时任务影响到了我们获取到的定时任务集合来进行处理的
private boolean releaseIfScheduleChangedSignificantly(
List<OperableTrigger> triggers, long triggerTime) {
if (isCandidateNewTimeEarlierWithinReason(triggerTime, true)) {
for (OperableTrigger trigger : triggers) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(trigger);
}
triggers.clear();
return true;
}
return false;
}
还是调用了isCandidateNewTimeEarlierWithinReason方法,只不过这次清空标志为true,最后将我们获取到的定时任务重新放到timeTriggers集合中,如果没有影响到,run方法继续执行
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
bndles集合用来保存要放入线程池的任务,接着判断定时器是否中止,没有中止继续执行,接下来这一步非常关键,我当时再看源码的时候,就因为少看了这一行代码换货了两个多小时,triggersFired方法
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> firedTriggers) {
synchronized (lock) {
List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
for (OperableTrigger trigger : firedTriggers) {
TriggerWrapper tw = triggersByKey.get(trigger.getKey());
// was the trigger deleted since being acquired?
if (tw == null || tw.trigger == null) {
continue;
}
// was the trigger completed, paused, blocked, etc. since being acquired?
if (tw.state != TriggerWrapper.STATE_ACQUIRED) {
continue;
}
Calendar cal = null;
if (tw.trigger.getCalendarName() != null) {
cal = retrieveCalendar(tw.trigger.getCalendarName());
if(cal == null)
continue;
}
Date prevFireTime = trigger.getPreviousFireTime();
// in case trigger was replaced between acquiring and firing
timeTriggers.remove(tw);
// call triggered on our copy, and the scheduler's copy
tw.trigger.triggered(cal);
trigger.triggered(cal);
//tw.state = TriggerWrapper.STATE_EXECUTING;
tw.state = TriggerWrapper.STATE_WAITING;
TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(
tw.jobKey), trigger, cal,
false, new Date(), trigger.getPreviousFireTime(), prevFireTime,
trigger.getNextFireTime());
JobDetail job = bndle.getJobDetail();
if (job.isConcurrentExectionDisallowed()) {
ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(job.getKey());
for (TriggerWrapper ttw : trigs) {
if (ttw.state == TriggerWrapper.STATE_WAITING) {
ttw.state = TriggerWrapper.STATE_BLOCKED;
}
if (ttw.state == TriggerWrapper.STATE_PAUSED) {
ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
}
timeTriggers.remove(ttw);
}
blockedJobs.add(job.getKey());
} else if (tw.trigger.getNextFireTime() != null) {
synchronized (lock) {
timeTriggers.add(tw);
}
}
results.add(new TriggerFiredResult(bndle));
}
return results;
}
}
简单来说,就是遍历我们传入的定时任务的集合,修改每个定时任务的上次触发时间为下次触发时间,计算下次触发时间并修改,修改任务的状态,将定时任务封装成为TriggerFiredBundle。最后判断,当前定时任务是否支持并发执行,如果不支持,首先将定时任务的状态修改为阻塞状态,从timeTriggers结合中移除当前定时任务,并添加到blockedJobs集合中,否则添加到timeTriggers集合中,最后将封装好的TriggerFiredBundle再封装放到results集合中,最后遍历完之后,返回results集合。
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
最后这部分就比较简单了,就是将我们的定时任务包装成JobRunShell放入线程池中去执行,我们现在要关注的是当定时任务执行完之后的操作,也就是JobRunShell里面的run方法的最后一部分:
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
在定时任务执行完的时候会触发定时任务完成这个事件
public void triggeredJobComplete(OperableTrigger trigger,
JobDetail jobDetail, CompletedExecutionInstruction triggerInstCode) {
synchronized (lock) {
JobWrapper jw = jobsByKey.get(jobDetail.getKey());
TriggerWrapper tw = triggersByKey.get(trigger.getKey());
// It's possible that the job is null if:
// 1- it was deleted during execution
// 2- RAMJobStore is being used only for volatile jobs / triggers
// from the JDBC job store
if (jw != null) {
JobDetail jd = jw.jobDetail;
if (jd.isPersistJobDataAfterExecution()) {
JobDataMap newData = jobDetail.getJobDataMap();
if (newData != null) {
newData = (JobDataMap)newData.clone();
newData.clearDirtyFlag();
}
jd = jd.getJobBuilder().setJobData(newData).build();
jw.jobDetail = jd;
}
if (jd.isConcurrentExectionDisallowed()) {
blockedJobs.remove(jd.getKey());
ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(jd.getKey());
for(TriggerWrapper ttw : trigs) {
if (ttw.state == TriggerWrapper.STATE_BLOCKED) {
ttw.state = TriggerWrapper.STATE_WAITING;
timeTriggers.add(ttw);
}
if (ttw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) {
ttw.state = TriggerWrapper.STATE_PAUSED;
}
}
signaler.signalSchedulingChange(0L);
}
} else { // even if it was deleted, there may be cleanup to do
blockedJobs.remove(jobDetail.getKey());
}
// check for trigger deleted during execution...
if (tw != null) {
if (triggerInstCode == CompletedExecutionInstruction.DELETE_TRIGGER) {
if(trigger.getNextFireTime() == null) {
// double check for possible reschedule within job
// execution, which would cancel the need to delete...
if(tw.getTrigger().getNextFireTime() == null) {
removeTrigger(trigger.getKey());
}
} else {
removeTrigger(trigger.getKey());
signaler.signalSchedulingChange(0L);
}
} else if (triggerInstCode == CompletedExecutionInstruction.SET_TRIGGER_COMPLETE) {
tw.state = TriggerWrapper.STATE_COMPLETE;
timeTriggers.remove(tw);
signaler.signalSchedulingChange(0L);
} else if(triggerInstCode == CompletedExecutionInstruction.SET_TRIGGER_ERROR) {
getLog().info("Trigger " + trigger.getKey() + " set to ERROR state.");
tw.state = TriggerWrapper.STATE_ERROR;
signaler.signalSchedulingChange(0L);
} else if (triggerInstCode == CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR) {
getLog().info("All triggers of Job "
+ trigger.getJobKey() + " set to ERROR state.");
setAllTriggersOfJobToState(trigger.getJobKey(), TriggerWrapper.STATE_ERROR);
signaler.signalSchedulingChange(0L);
} else if (triggerInstCode == CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_COMPLETE) {
setAllTriggersOfJobToState(trigger.getJobKey(), TriggerWrapper.STATE_COMPLETE);
signaler.signalSchedulingChange(0L);
}
}
}
}
代码虽然长,但是逻辑还是比较简单的,所以就不分部分来说了
首先获取到定时任务的原始包装类,判断是否支持并发执行,如果支持就只需要从blockedJobs里面移除就好了,因为之前已经向timeTriggers重新添加过了,否则先从blockedJobs里面移除,修改定时任务的状态,然后添加到timeTriggers中,最后就是根据定时任务的不同状态进行处理了。
都完成之后,这个定时任务就等待下一次被调度。