配置多个调度器及其线程池
import com.wipinfo.central.engine.constant.JobConstant;
import com.wipinfo.central.engine.job.MyJobFactory;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author echoo华地
* @description 定时任务配置
* @date 2022/5/31 9:06
*/
@Configuration
@Slf4j
public class QuartzConfig {
/**
* 数据清理任务调度器
*/
@Bean(JobConstant.JOB_EXECUTOR_CLEAN)
public Scheduler cleanScheduler(@Qualifier("CleanSchedulerFactoryBean") SchedulerFactoryBean timingSchedulerFactoryBean) {
return timingSchedulerFactoryBean.getScheduler();
}
/**
* 虚拟机定时开关机任务调度器
*/
@Bean(JobConstant.JOB_EXECUTOR_TIMING)
public Scheduler timingScheduler(@Qualifier("TimingSchedulerFactoryBean") SchedulerFactoryBean schedulerFactoryBean) {
return schedulerFactoryBean.getScheduler();
}
/**
* 数据上传任务调度器
*/
@Bean(JobConstant.JOB_EXECUTOR_UPLOAD)
public Scheduler uploadScheduler(@Qualifier("UploadSchedulerFactoryBean") SchedulerFactoryBean schedulerFactoryBean) {
return schedulerFactoryBean.getScheduler();
}
/**
* 数据统计任务调度器
*/
@Bean(JobConstant.JOB_EXECUTOR_CALCULATE)
public Scheduler calScheduler(@Qualifier("CalSchedulerFactoryBean") SchedulerFactoryBean schedulerFactoryBean) {
return schedulerFactoryBean.getScheduler();
}
/**
* 虚拟机定时开关机调度器工厂类
*/
@Bean("TimingSchedulerFactoryBean")
public SchedulerFactoryBean timingSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_TIMING, 5, false, threadPoolExecutor, jobFactory());
}
/**
* 数据上传任务调度器工厂类
*/
@Bean("UploadSchedulerFactoryBean")
public SchedulerFactoryBean uploadSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_UPLOAD, 5, false, threadPoolExecutor, jobFactory());
}
/**
* 统计任务调度器工厂类
*/
@Bean("CalSchedulerFactoryBean")
public SchedulerFactoryBean calSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_CALCULATE, 5, false, threadPoolExecutor, jobFactory());
}
/**
* 数据清理任务调度器工厂类
*/
@Bean("CleanSchedulerFactoryBean")
public SchedulerFactoryBean cleanSchedulerFactoryBean() {
ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_CLEAN, 5, false, threadPoolExecutor, jobFactory());
}
/**
* @param name 调度器名称
* @param delay 启动延时
* @param waitForJob 等待任务执行完毕再关闭程序
* @param threadPoolExecutor 调度器线程池
* @return org.springframework.scheduling.quartz.SchedulerFactoryBean
* @description 生成调度器工厂
* @author echoo华地
* @date 2022/5/31 9:04
*/
private SchedulerFactoryBean createSchedulerFactoryBean(String name, Integer delay, boolean waitForJob, ThreadPoolTaskExecutor threadPoolExecutor, MyJobFactory jobFactory) {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
System.out.println("当前 = " + schedulerFactoryBean);
schedulerFactoryBean.setSchedulerName(name); //
schedulerFactoryBean.setStartupDelay(delay); // 延时启动
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(waitForJob); // 等待任务执行完毕再关闭程序
// schedulerFactoryBean.setDataSource(); // 因为 Quartz 默认数据库为 Mysql,因此做自定义磁盘化
schedulerFactoryBean.setJobFactory(jobFactory);
schedulerFactoryBean.setTaskExecutor(threadPoolExecutor);
return schedulerFactoryBean;
}
/**
* @param corePoolSize 核心线程
* @param maxPoolSize 最大线程
* @param queueCapacity 队列容量
* @param keepAliveSeconds 闲置时间
* @return org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
* @description 生成线程池
* @author echoo华地
* @date 2022/5/31 8:54
*/
private static ThreadPoolTaskExecutor createThreadPoolExecutor(Integer corePoolSize, Integer maxPoolSize, Integer queueCapacity, Integer keepAliveSeconds) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds); // Default is 60
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // AbortPolicy : 默认拒绝策略,抛出异常 DiscardPolicy:不会抛出异常,会丢掉任务
threadPoolTaskExecutor.initialize(); // 手动初始化
return threadPoolTaskExecutor;
}
@Bean
public MyJobFactory jobFactory() {
return new MyJobFactory();
}
}
调度管理器,定义自己需要的功能
import com.wipinfo.central.engine.constant.JobConstant;
import com.wipinfo.central.engine.entity.QuartzTask;
import com.wipinfo.central.engine.enums.JobStatusEnums;
import com.wipinfo.central.engine.enums.JobTypeEnums;
import com.wipinfo.central.engine.service.IQuartzService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
/**
* @author echoo华地
* @description 调度管理器
* @date 2022/5/30 13:47
*/
@Slf4j
@Component
public class SchedulerManager {
@Resource(name = JobConstant.JOB_EXECUTOR_TIMING)
private Scheduler timingScheduler;
@Resource(name = JobConstant.JOB_EXECUTOR_UPLOAD)
private Scheduler uploadScheduler;
@Resource(name = JobConstant.JOB_EXECUTOR_CALCULATE)
private Scheduler calculateScheduler;
@Autowired
@Qualifier(JobConstant.JOB_EXECUTOR_CLEAN)
private Scheduler cleanScheduler;
@Autowired
private IQuartzService quartzService;
/**
* 调度所有任务
*/
public void schedulerAllJobs() {
scheduleTimingJobs();
scheduleCalculateJobs();
scheduleUploadJobs();
// scheduleCleanJobs();
}
/**
* 调度数据上传任务
*/
public void scheduleUploadJobs() {
log.info("启动数据上传任务调度器");
scheduleUploadJobFromDb(uploadScheduler);
}
/**
* 调度数据统计任务
*/
public void scheduleCalculateJobs() {
log.info("启动数据统计任务调度器");
scheduleCalculateJobFromDb(calculateScheduler);
}
/**
* 调度数据清理任务
*/
public void scheduleCleanJobs() {
log.info("启动数据清理任务调度器");
scheduleCleanJobFromDb(cleanScheduler);
}
/**
* 调度定时开关机任务
*/
public void scheduleTimingJobs() {
log.info("启动开关机定时任务调度器");
scheduleTimingJobFromDb(timingScheduler);
}
/**
* 调度数据库数据上传任务
*/
private void scheduleUploadJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_UPLOAD);
log.info("当前数据上传任务数:{}", tasks.size());
tasks.stream().filter(task ->
task.getStatus() == 0
).forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
log.error("数据上传任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
});
}
/**
* 调度数据库数据统计任务
*/
private void scheduleCalculateJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_CALCULATE);
log.info("当前数据统计任务数:{}", tasks.size());
tasks.stream().filter(task ->
task.getStatus() == 0
).forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
log.error("数据统计任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
});
}
/**
* 调度数据库数据清理任务
*/
private void scheduleCleanJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_CLEAN);
log.info("当前数据清理任务数:{}", tasks.size());
tasks.stream().filter(task ->
task.getStatus() == 0
).forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
log.error("数据清理任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
});
}
/**
* 调度数据库定时开关机任务
*/
private void scheduleTimingJobFromDb(Scheduler scheduler) {
List<QuartzTask> tasks = Collections.emptyList();
// 查询 定时开关机组 任务
tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_TIMING);
log.info("当前定时开关机任务数: {}", tasks.size());
// 把所有定时任务注入调度器中
tasks.forEach(task -> {
try {
schedulerJob(task, scheduler);
} catch (Exception e) {
if (e.getMessage().contains("will never fire")) {
// 修改任务状态为已过期
quartzService.updateStatus(task.getId(), JobStatusEnums.ALREADY_EXECUTE.getStatus());
log.warn("定时任务[{}]已过期失效", task.getJobName());
} else {
log.error("定时任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
}
}
});
}
/**
* @param schedulerName 需要重载的调度器名称
* @description 重载指定调度器任务
* @author echoo华地
* @date 2022/5/31 10:00
*/
public void rescheduleJobs(String schedulerName) {
log.info("正在重载[{}]调度器任务", schedulerName);
try {
switch (schedulerName) {
case JobConstant.JOB_EXECUTOR_TIMING:
if (clearSchedulerJobs(timingScheduler))
scheduleTimingJobFromDb(timingScheduler);
break;
case JobConstant.JOB_EXECUTOR_UPLOAD:
if (clearSchedulerJobs(uploadScheduler))
scheduleUploadJobFromDb(uploadScheduler);
break;
case JobConstant.JOB_EXECUTOR_CALCULATE:
if (clearSchedulerJobs(calculateScheduler))
scheduleCalculateJobFromDb(calculateScheduler);
break;
case JobConstant.JOB_EXECUTOR_CLEAN:
if (clearSchedulerJobs(calculateScheduler))
scheduleCleanJobFromDb(calculateScheduler);
break;
default:
log.error("重载调度器任务失败;错误[{}调度器不存在]", schedulerName);
}
} catch (Exception e) {
log.error("重载定时任务失败;错误:[{}]", e.getMessage());
}
}
/**
* 清空定时开关机任务
*/
private boolean clearSchedulerJobs(Scheduler scheduler) {
try {
log.info("清空[{}]调度器所有任务", scheduler.getSchedulerName());
scheduler.clear();
return true;
} catch (SchedulerException e) {
log.error("清空调度器任务异常;错误[{}]", e.getMessage());
return false;
}
}
/**
* @param task 单个任务
* @param scheduler 调度器
* @description 调度器添加调度任务
* @author echoo华地
* @date 2022/5/30 17:12
*/
private void schedulerJob(QuartzTask task, Scheduler scheduler) throws SchedulerException {
JobKey jobKey = new JobKey(task.getJobName(), task.getJobGroup()); // 任务唯一标识
Integer type = task.getType();
Class<? extends Job> jobClass = getJobClass(type);
if ((jobClass == null) || (!Job.class.isAssignableFrom(jobClass))) {
log.error("调度失败,任务类型错误!");
}
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobKey) // 任务类型,任务数据
.usingJobData(JobConstant.JOB_DATA_JOB_ID, task.getId())
.usingJobData(JobConstant.JOB_DATA_JOB_TYPE, task.getType())
.usingJobData(JobConstant.JOB_DATA_JOB_NAME, task.getJobName()).build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCron());
// 以任务名为名称,并根据cron表达式创建触发器
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(task.getJobName(),
task.getJobGroup()).withSchedule(scheduleBuilder).build();
if (scheduler.checkExists(jobKey)) {
// 任务已在调度器内,先删除
scheduler.deleteJob(jobKey);
}
// 加入调度
scheduler.scheduleJob(jobDetail, cronTrigger);
}
private Class<? extends Job> getJobClass(Integer jobType) {
Class<?> jobClass = JobTypeEnums.getJobClassByType(jobType);
if (Job.class.isAssignableFrom(jobClass)) {
return (Class<Job>) jobClass;
} else {
return null;
}
}
}
版权声明:本文为qq_31856061原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。