【过往系列文章】
过往的文章也有类似的,但过去的版本已经是4年前2016年当时Spring版本还是3.x,现在都用5.x了,今年2020年,想了想还是更新一下最新代码分享给大家,借着今天1024的程序猿大节,把京缘网络关于单系统内核任务架构源码和主要技术点分享给大家,从Controller到Service到使用JY封装好的工具类调用quartz!
我们操作动态任务是为了程序已经启动的时候可以在线控制,不需要每次通过启动程序和关闭程序来调节配置文件,当然这几年出了像xxl-job这样的分布式系统,我们可以在代码通过api来调用,那如果我们是小项目想在自己主系统写一个呢,而不需要多个系统运行,这是个不错的选择!
然后我们开始
首先我们用版本pom配置
<properties>
<spring.version>5.2.3.RELEASE</spring.version>
<quartz.version>2.2.3</quartz.version> <dependencies>spring相关的依赖就不列举了,自己引入,基本系统都有,实在小白请看看过往系列
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>xxxxx</artifactId>
<version>${spring.version}</version>
</dependency>准备配置之后,入正题
1.在线创建任务
/**
* 创建任务
*
* @param scheduler the scheduler
* @param scheduleJob the schedule job
* @throws Exception
*/
public static void createJob(Scheduler scheduler, Schedule scheduleJob) throws Exception {
//任务执行类
//Class<? extends Job> jobClass =(Class<? extends Job>) Class.forName (scheduleJob.getJobClass());
Class<?> classType =Class.forName (scheduleJob.getJobClass());
if(Job.class.isAssignableFrom(classType)) {
@SuppressWarnings("unchecked")
Class<? extends Job> classJob=(Class<? extends Job>) classType;
createJob(scheduler, scheduleJob.getName(), scheduleJob.getGrouping(),
scheduleJob.getCron(),scheduleJob,classJob);
}
}
/**
* 创建定时任务
* @param scheduler the scheduler
* @param jobName the job name
* @param jobGroup the job group
* @param cronExpression the cron expression
* @param isSync the is sync
* @param param the param
* @throws Exception
*/
public static void createJob(Scheduler scheduler, String jobName, String jobGroup, String cronExpression, Object param,Class<? extends Job> jobClass) throws Exception {
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).build();
// 放入参数,运行时的方法可以获取
jobDetail.getJobDataMap().put(JobConst.JOB_PARAM_KEY, param);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
}2.在线创建一次性任务
/**
* 创建一次性任务
*
* @param scheduler the scheduler
* @param scheduleJob the schedule job
* @throws Exception
*/
public static void createOnceJob(Scheduler scheduler, Schedule scheduleJob) throws Exception {
//设置已过去的时间,确保运行
scheduleJob.setCron(JobConst.DEF_ONCE_CRON);
createJob(scheduler,scheduleJob);
}
/**
* 运行一次任务
* @param scheduler
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
public static void runOnce(Scheduler scheduler, String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = getJobKey(jobName, jobGroup);
//List<?> triggers = (List<?>) scheduler.getTriggersOfJob(jobKey);
if(scheduler.checkExists(jobKey)) {
scheduler.interrupt(jobKey);
//scheduler.deleteJob(jobKey);
}
//启动任务
scheduler.triggerJob(jobKey);
}3.在线获取排期队列中的任务
/**
* 获取jobKey
* @param jobName the job name
* @param jobGroup the job group
* @return the job key
*/
public static JobKey getJobKey(String jobName, String jobGroup) {
return JobKey.jobKey(jobName, jobGroup);
}
4.在线更新任务,在这里我们一般是停掉当前的这个任务,如果没执行的,将停掉,还在执行当中的,还是会执行下去
/**
* 更新定时任务
* @param scheduler the scheduler
* @param scheduleJob the schedule job
* @throws SchedulerException
*/
public static void updateJob(Scheduler scheduler, Schedule scheduleJob) throws SchedulerException {
updateJob(scheduler, scheduleJob.getName(), scheduleJob.getGrouping(),
scheduleJob.getCron(), scheduleJob);
}
/**
* 更新定时任务
*
* @param scheduler the scheduler
* @param jobName the job name
* @param jobGroup the job group
* @param cronExpression the cron expression
* @param param the param
* @throws SchedulerException
*/
public static void updateJob(Scheduler scheduler, String jobName, String jobGroup,
String cronExpression,Object param) throws SchedulerException {
TriggerKey triggerKey = ScheduleKit.getTriggerKey(jobName, jobGroup);
//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
//按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
//按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}5.在线删除任务
/**
* 删除定时任务
*
* @param scheduler
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
public static void deleteJob(Scheduler scheduler, String jobName, String jobGroup) throws SchedulerException {
scheduler.deleteJob(getJobKey(jobName, jobGroup));
}6.在线暂停任务
/**
* 暂停任务
* @param scheduler
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
public static void pauseJob(Scheduler scheduler, String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey =getJobKey(jobName, jobGroup);
scheduler.pauseJob(jobKey);
}7.在线恢复任务
/**
* 恢复任务
* @param scheduler
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
public static void resumeJob(Scheduler scheduler, String jobName, String jobGroup) throws SchedulerException {
JobKey jobKey = getJobKey(jobName, jobGroup);
scheduler.resumeJob(jobKey);
}8.上面用到的公共方法
public static boolean isClassJob(String classPath) {
boolean res=false;
try {
Class<?> classType = Class.forName(classPath);
if(Job.class.isAssignableFrom(classType)) {
res=true;
}
} catch (ClassNotFoundException e) {
log.error("Schedule task isClassJob error",e);
}
return res;
}9.我们整合到SpringMvc框架,以下我把自己项目给大家参考,如何调用上面的工具类ScheduleKit(以上的方法都可以整合到这个工具类中)
建立一个从控制层(Controller)到服务层(Service)的需要用到实体类Schedule
package com.jy.entity.task.job;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.ibatis.type.Alias;
import com.jy.entity.base.BaseEntity;
@Alias("Schedule")
public class Schedule extends BaseEntity{
private static final long serialVersionUID = 1L;
/** 任务id */
private String id;
/** 任务名称 */
private String name;
/** 任务分组 */
private String grouping;
/** 任务别名 */
private String aliasName;
/** 指定执行类 */
private String jobClass;
/** 任务状态 0停用 1启用 2删除 */
private Integer status;
/** 任务运行时间表达式 */
private String cron;
private List<Date> nextTimePoints=new ArrayList<>();
/** 任务描述 */
private String description;
/** 创建时间 */
private Date createTime;
/** 修改时间 */
private Date updateTime;
private String keyWord;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGrouping() {
return grouping;
}
public void setGrouping(String grouping) {
this.grouping = grouping;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getJobClass() {
return jobClass;
}
public void setJobClass(String jobClass) {
this.jobClass = jobClass;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public List<Date> getNextTimePoints() {
return nextTimePoints;
}
public void setNextTimePoints(List<Date> nextTimePoints) {
this.nextTimePoints = nextTimePoints;
}
public String getAliasName() {
return aliasName;
}
public void setAliasName(String aliasName) {
this.aliasName = aliasName;
}
public String getKeyWord() {
return keyWord;
}
public void setKeyWord(String keyWord) {
this.keyWord = keyWord;
}
}
然后我们要一个接口类ScheduleService
package com.jy.service.task.job;
import com.jy.entity.task.job.Schedule;
import com.jy.service.base.BaseService;
/**
* 定时任务service
*/
public interface ScheduleService extends BaseService<Schedule> {
/**
* 初始化定时任务
*/
public void initScheduleJob() throws Exception;
/**
* 新增
*
* @param TaskLog
* @return
*/
public int createJob(Schedule job) throws Exception;
/**
* 直接修改 只能修改运行的时间,参数、同异步等无法修改
*
* @param TaskLog
*/
public int updateJob(Schedule job) throws Exception;
/**
* 删除
*
* @param scheduleJobId
*/
public int deleteJob(Schedule job) throws Exception;
/**
* 运行一次任务
*
* @param scheduleJobId the schedule job id
* @return
*/
public int runOnce(Schedule job) throws Exception;
/**
* 暂停任务
*
* @param scheduleJobId the schedule job id
* @return
*/
public int pauseJob(Schedule job) throws Exception;
/**
* 恢复任务
*
* @param scheduleJobId the schedule job id
* @return
*/
public int resumeJob(Schedule job) throws Exception;
}
具体实现这个接口类的实现类ScheduleServiceImp
package com.jy.service.task.job;
import java.util.Date;
import java.util.List;
import org.quartz.CronTrigger;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.jy.common.mybatis.Page;
import com.jy.common.thread.QuickSleep;
import com.jy.common.utils.ObjectUtil;
import com.jy.common.utils.idgen.SnowflakeGen;
import com.jy.common.var.Const;
import com.jy.entity.task.job.Schedule;
import com.jy.repository.task.job.ScheduleDao;
import com.jy.service.base.BaseServiceImp;
import com.jy.task.common.ScheduleKit;
@Service("scheduleService")
public class ScheduleServiceImp extends BaseServiceImp<Schedule> implements ScheduleService {
/** 调度工厂Bean */
@Autowired
private Scheduler scheduler;
@Autowired
private ScheduleDao dao;
@Override
public void initScheduleJob() throws Exception {
//查找启用的任务
List<Schedule> scheduleJobList = dao.getRunList();
if (ObjectUtil.isNotEmpty(scheduleJobList)) {
for (Schedule scheduleJob : scheduleJobList) {
CronTrigger cronTrigger = ScheduleKit.getCronTrigger(scheduler, scheduleJob.getName(),scheduleJob.getGrouping());
if (cronTrigger == null) {
// 不存在,创建一个
ScheduleKit.createJob(scheduler, scheduleJob);
} else {
// 已存在,那么更新相应的定时设置
ScheduleKit.updateJob(scheduler, scheduleJob);
}
}
}
}
@Override
public Page<Schedule> findByPage(Schedule schedule, Page<Schedule> page) {
List<String> idList=dao.findIdsByPage(schedule, page);
if(ObjectUtil.isNotEmpty(idList)) {
List<Schedule> results=dao.getList(idList);
Date now = new Date();
for(Schedule dbSchedule:results) {
List<Date> nextTimePoints=ScheduleKit.calNextPoint(dbSchedule.getCron(),now,3);
dbSchedule.setNextTimePoints(nextTimePoints);
}
page.setResults(results);
}
return page;
}
@Override
@Transactional
public int createJob(Schedule job) throws Exception {
int res=0;
String cron=job.getCron();
if(CronSequenceGenerator.isValidExpression(cron)) {
if(ScheduleKit.isClassJob(job.getJobClass())) {
job.setId(SnowflakeGen.generator());
job.setCreateTime(new Date());
// 更新数据库
int actionCount=dao.insert(job);
if(actionCount==1) {
// 当状态为启用时
if (Const.Task.Status.RUN.code()==job.getStatus()) {
ScheduleKit.createJob(scheduler, job);
}
res = 1;
}
}else {
res = 3;
}
}else {
res = 2;
}
return res;
}
@Override
@Transactional
public int updateJob(Schedule job) throws Exception {
int res=0;
String cron=job.getCron();
if(CronSequenceGenerator.isValidExpression(cron)) {
if(ScheduleKit.isClassJob(job.getJobClass())) {
// 从数据库查找原信息
Schedule dbJob = dao.getById(job.getId());
// 更新数据库
job.setUpdateTime(new Date());
int actionCount=dao.update(job);
if(actionCount==1) {
// 先删除
ScheduleKit.deleteJob(scheduler, dbJob.getName(), dbJob.getGrouping());
// 当状态为启用时
if (Const.Task.Status.RUN.code()==dbJob.getStatus()) {
ScheduleKit.createJob(scheduler, job);
}
res = 1;
}
}else {
res = 3;
}
}else {
res = 2;
}
return res;
}
@Override
@Transactional
public int deleteJob(Schedule job) throws SchedulerException {
int res=0;
// 从数据库查找原信息
Schedule dbJob = dao.getById(job.getId());
// 先删除
ScheduleKit.deleteJob(scheduler, dbJob.getName(), dbJob.getGrouping());
// 更新数据库
int actionCount=dao.delete(dbJob);
if(actionCount==1) {
res = 1;
}
return res;
}
@Override
@Transactional
public int runOnce(Schedule job) throws Exception {
int res=0;
// 从数据库查找原信息
Schedule dbJob = dao.getById(job.getId());
if (Const.Task.Status.RUN.code()==dbJob.getStatus()) {
// 运行一次任务
res = 2;
} else {
// 当任务没启动,必须先创建
ScheduleKit.createOnceJob(scheduler, dbJob);
// 然后立刻运行一次任务
ScheduleKit.runOnce(scheduler, dbJob.getName(), dbJob.getGrouping());
QuickSleep.sleepTimeUnit(1);
// 再删除任务
ScheduleKit.deleteJob(scheduler, dbJob.getName(), dbJob.getGrouping());
res = 1;
}
return res;
}
@Override
@Transactional
public int pauseJob(Schedule job) throws Exception {
int res=0;
// 从数据库查找原信息
Schedule dbJob = dao.getById(job.getId());
if (Const.Task.Status.RUN.code()==dbJob.getStatus()) {
// 判断jobKey为不为空,如为空,任务已停止
// 先暂停任务
// ScheduleKit.pauseJob(scheduler, scheduleJob.getJobName(),
// scheduleJob.getJobGroup());
ScheduleKit.deleteJob(scheduler, dbJob.getName(), dbJob.getGrouping());
// 更新数据库
dbJob.setStatus(Const.Task.Status.STOP.code());
dbJob.setUpdateTime(new Date());
dao.update(dbJob);
res = 1;
} else {
// 任务没启动,谈何暂停...
res = 2;
}
return res;
}
@Override
@Transactional
public int resumeJob(Schedule job) throws Exception {
int res=0;
// 从数据库查找原信息
Schedule dbJob = dao.getById(job.getId());
if (Const.Task.Status.STOP.code()==dbJob.getStatus()) {
ScheduleKit.createJob(scheduler, dbJob);
// 更新数据库
dbJob.setStatus(Const.Task.Status.RUN.code());
dbJob.setUpdateTime(new Date());
dao.update(dbJob);
res = 1;
} else {
res = 2;
}
return res;
}
}
然后我们就是我们的Controller
建立一个ScheduleCtrl类
package com.jy.controller.task.job;
import java.util.Date;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.jy.common.ajax.AjaxRes;
import com.jy.common.mybatis.Page;
import com.jy.common.utils.SystemClock;
import com.jy.common.var.Const;
import com.jy.common.var.ResCode;
import com.jy.controller.base.BaseCtrl;
import com.jy.entity.base.BaseList;
import com.jy.entity.task.job.Schedule;
import com.jy.interceptor.auth.JYPermission;
import com.jy.service.task.job.ScheduleService;
import com.jy.task.common.ScheduleKit;
/**
* 任务管理
*/
@Controller
@RequestMapping("/backstage/task/schedule/")
public class ScheduleCtrl extends BaseCtrl<Schedule>{
private static final String SECURITY_URL="/backstage/task/schedule/index";
private static final String SECURITY_ADD_URL="/backstage/task/schedule/add";
private static final String SECURITY_EDIT_URL="/backstage/task/schedule/edit";
@Autowired
private ScheduleService service;
/**
* 任务首页
*/
@JYPermission
@GetMapping("index")
public String index(Model model) {
model.addAttribute("permitBtn", getPermitBtn(Const.RESOURCES_TYPE_FUNCTION));
return "/task/schedule/list";
}
@JYPermission(SECURITY_URL)
@PostMapping(value = "findByPage")
@ResponseBody
public AjaxRes findByPage(Page<Schedule> page, Schedule schedule) {
AjaxRes ar = getAjaxRes();
try {
Page<Schedule> result = service.findByPage(schedule, page);
BaseList baseList=new BaseList();
baseList.list(result);
ar.setSucceed(baseList);
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.DATA_FAIL);
}
return ar;
}
@JYPermission(SECURITY_URL)
@PostMapping(value = "calNextPoint")
@ResponseBody
public AjaxRes calNextPoint(Schedule schedule) {
AjaxRes ar = getAjaxRes();
try {
List<Date> nextTimePoints=ScheduleKit.calNextPoint(schedule.getCron(),
SystemClock.nowDate(),5);
ar.setSucceed(nextTimePoints);
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.DATA_FAIL);
}
return ar;
}
/**
* 任务添加页
*/
@JYPermission
@GetMapping("add")
public String add(Model model) {
return "/task/schedule/add";
}
@JYPermission(SECURITY_ADD_URL)
@PostMapping(value = "insert")
@ResponseBody
public AjaxRes insert(@RequestBody Schedule job) {
AjaxRes ar = getAjaxRes();
try {
int res = service.createJob(job);
if (res == 1) {
ar.resCode(ResCode.SAVE_SUCCEED);
}else if(res == 2) {
ar.resCode(ResCode.TASK_CRON_ERROR);
}else if(res == 3) {
ar.resCode(ResCode.TASK_CLASS_ERROR);
}else {
ar.resCode(ResCode.SAVE_FAIL);
}
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.SAVE_FAIL);
}
return ar;
}
@JYPermission(SECURITY_EDIT_URL)
@PostMapping(value = "find")
@ResponseBody
public AjaxRes find(Schedule schedule) {
AjaxRes ar = getAjaxRes();
try {
Schedule dbSchedule = service.get(schedule);
if(dbSchedule!=null) {
ar.setSucceed(dbSchedule);
}else {
ar.resCode(ResCode.DATA_FAIL);
}
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.DATA_FAIL);
}
return ar;
}
/**
* 任务添加页
*/
@JYPermission
@GetMapping("edit")
public String edit(Model model,String id) {
model.addAttribute("jobId", id);
return "/task/schedule/edit";
}
@JYPermission(SECURITY_EDIT_URL)
@PostMapping(value = "update")
@ResponseBody
public AjaxRes update(@RequestBody Schedule job) {
AjaxRes ar = getAjaxRes();
try {
int res = service.updateJob(job);
if (res == 1) {
ar.resCode(ResCode.UPDATE_SUCCEED);
}else if(res == 2) {
ar.resCode(ResCode.TASK_CRON_ERROR);
}else if(res == 3) {
ar.resCode(ResCode.TASK_CLASS_ERROR);
}else {
ar.resCode(ResCode.UPDATE_FAIL);
}
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.UPDATE_FAIL);
}
return ar;
}
@JYPermission
@PostMapping(value = "del")
@ResponseBody
public AjaxRes del(Schedule schedule) {
AjaxRes ar = getAjaxRes();
try {
int res = service.deleteJob(schedule);
if (res == 1) {
ar.resCode(ResCode.DEL_SUCCEED);
}else {
ar.resCode(ResCode.DEL_FAIL);
}
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.DEL_FAIL);
}
return ar;
}
@JYPermission
@PostMapping(value = "runOnce")
@ResponseBody
public AjaxRes runOnce(Schedule schedule) {
AjaxRes ar = getAjaxRes();
try {
int res = service.runOnce(schedule);
if (res == 1) {
ar.resCode(ResCode.RUN_SUCCEED);
}else if (res == 2) {
ar.resCode(ResCode.TASK_NEED_STOP);
}else {
ar.resCode(ResCode.RUN_FAIL);
}
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.RUN_FAIL);
}
return ar;
}
@JYPermission
@PostMapping(value = "resumeJob")
@ResponseBody
public AjaxRes resumeJob(Schedule schedule) {
AjaxRes ar = getAjaxRes();
try {
int res = service.resumeJob(schedule);
if (res == 1) {
ar.resCode(ResCode.START_SUCCEED);
}else if (res == 2) {
ar.resCode(ResCode.TASK_HAS_START);
}else {
ar.resCode(ResCode.SAVE_FAIL);
}
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.SAVE_FAIL);
}
return ar;
}
@JYPermission
@PostMapping(value = "pauseJob")
@ResponseBody
public AjaxRes pauseJob(Schedule schedule) {
AjaxRes ar = getAjaxRes();
try {
int res = service.pauseJob(schedule);
if (res == 1) {
ar.resCode(ResCode.PAUSE_SUCCEED);
}else if (res == 2) {
ar.resCode(ResCode.TASK_NOT_START_NOT_PAUSE);
}else {
ar.resCode(ResCode.PAUSE_FAIL);
}
} catch (Exception e) {
logger.error(e.toString(), e);
ar.resCode(ResCode.PAUSE_FAIL);
}
return ar;
}
}
以上就是大概的整合用到的类,如果有什么不懂的可以在下面留言,获取看看过往的文章【过往系列文章】谢谢大家