Quartz多个调度器+线程池模式分别调度任务

配置多个调度器及其线程池

import com.wipinfo.central.engine.constant.JobConstant;
import com.wipinfo.central.engine.job.MyJobFactory;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author echoo华地
 * @description 定时任务配置
 * @date 2022/5/31 9:06
 */
@Configuration
@Slf4j
public class QuartzConfig {

    /**
     * 数据清理任务调度器
     */
    @Bean(JobConstant.JOB_EXECUTOR_CLEAN)
    public Scheduler cleanScheduler(@Qualifier("CleanSchedulerFactoryBean") SchedulerFactoryBean timingSchedulerFactoryBean) {
        return timingSchedulerFactoryBean.getScheduler();
    }


    /**
     * 虚拟机定时开关机任务调度器
     */
    @Bean(JobConstant.JOB_EXECUTOR_TIMING)
    public Scheduler timingScheduler(@Qualifier("TimingSchedulerFactoryBean") SchedulerFactoryBean schedulerFactoryBean) {
        return schedulerFactoryBean.getScheduler();
    }

    /**
     * 数据上传任务调度器
     */
    @Bean(JobConstant.JOB_EXECUTOR_UPLOAD)
    public Scheduler uploadScheduler(@Qualifier("UploadSchedulerFactoryBean") SchedulerFactoryBean schedulerFactoryBean) {
        return schedulerFactoryBean.getScheduler();
    }

    /**
     * 数据统计任务调度器
     */
    @Bean(JobConstant.JOB_EXECUTOR_CALCULATE)
    public Scheduler calScheduler(@Qualifier("CalSchedulerFactoryBean") SchedulerFactoryBean schedulerFactoryBean) {
        return schedulerFactoryBean.getScheduler();
    }


    /**
     * 虚拟机定时开关机调度器工厂类
     */
    @Bean("TimingSchedulerFactoryBean")
    public SchedulerFactoryBean timingSchedulerFactoryBean() {
        ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
        return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_TIMING, 5, false, threadPoolExecutor, jobFactory());
    }

    /**
     * 数据上传任务调度器工厂类
     */
    @Bean("UploadSchedulerFactoryBean")
    public SchedulerFactoryBean uploadSchedulerFactoryBean() {
        ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
        return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_UPLOAD, 5, false, threadPoolExecutor, jobFactory());
    }

    /**
     * 统计任务调度器工厂类
     */
    @Bean("CalSchedulerFactoryBean")
    public SchedulerFactoryBean calSchedulerFactoryBean() {
        ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
        return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_CALCULATE, 5, false, threadPoolExecutor, jobFactory());
    }

    /**
     * 数据清理任务调度器工厂类
     */
    @Bean("CleanSchedulerFactoryBean")
    public SchedulerFactoryBean cleanSchedulerFactoryBean() {
        ThreadPoolTaskExecutor threadPoolExecutor = createThreadPoolExecutor(1, 2, 4, 60);
        return createSchedulerFactoryBean(JobConstant.JOB_EXECUTOR_CLEAN, 5, false, threadPoolExecutor, jobFactory());
    }

    /**
     * @param name               调度器名称
     * @param delay              启动延时
     * @param waitForJob         等待任务执行完毕再关闭程序
     * @param threadPoolExecutor 调度器线程池
     * @return org.springframework.scheduling.quartz.SchedulerFactoryBean
     * @description 生成调度器工厂
     * @author echoo华地
     * @date 2022/5/31 9:04
     */
    private SchedulerFactoryBean createSchedulerFactoryBean(String name, Integer delay, boolean waitForJob, ThreadPoolTaskExecutor threadPoolExecutor, MyJobFactory jobFactory) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        System.out.println("当前 = " + schedulerFactoryBean);
        schedulerFactoryBean.setSchedulerName(name); //
        schedulerFactoryBean.setStartupDelay(delay); // 延时启动
        schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(waitForJob); // 等待任务执行完毕再关闭程序
        // schedulerFactoryBean.setDataSource(); // 因为 Quartz 默认数据库为 Mysql,因此做自定义磁盘化
        schedulerFactoryBean.setJobFactory(jobFactory);
        schedulerFactoryBean.setTaskExecutor(threadPoolExecutor);
        return schedulerFactoryBean;
    }


    /**
     * @param corePoolSize     核心线程
     * @param maxPoolSize      最大线程
     * @param queueCapacity    队列容量
     * @param keepAliveSeconds 闲置时间
     * @return org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
     * @description 生成线程池
     * @author echoo华地
     * @date 2022/5/31 8:54
     */
    private static ThreadPoolTaskExecutor createThreadPoolExecutor(Integer corePoolSize, Integer maxPoolSize, Integer queueCapacity, Integer keepAliveSeconds) {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds); // Default is 60
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // AbortPolicy : 默认拒绝策略,抛出异常 DiscardPolicy:不会抛出异常,会丢掉任务
        threadPoolTaskExecutor.initialize(); // 手动初始化
        return threadPoolTaskExecutor;
    }

    @Bean
    public MyJobFactory jobFactory() {
        return new MyJobFactory();
    }
}

调度管理器,定义自己需要的功能

import com.wipinfo.central.engine.constant.JobConstant;
import com.wipinfo.central.engine.entity.QuartzTask;
import com.wipinfo.central.engine.enums.JobStatusEnums;
import com.wipinfo.central.engine.enums.JobTypeEnums;
import com.wipinfo.central.engine.service.IQuartzService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;

/**
 * @author echoo华地
 * @description 调度管理器
 * @date 2022/5/30 13:47
 */
@Slf4j
@Component
public class SchedulerManager {

    @Resource(name = JobConstant.JOB_EXECUTOR_TIMING)
    private Scheduler timingScheduler;
    @Resource(name = JobConstant.JOB_EXECUTOR_UPLOAD)
    private Scheduler uploadScheduler;
    @Resource(name = JobConstant.JOB_EXECUTOR_CALCULATE)
    private Scheduler calculateScheduler;
    @Autowired
    @Qualifier(JobConstant.JOB_EXECUTOR_CLEAN)
    private Scheduler cleanScheduler;
    @Autowired
    private IQuartzService quartzService;

    /**
     * 调度所有任务
     */
    public void schedulerAllJobs() {
        scheduleTimingJobs();
        scheduleCalculateJobs();
        scheduleUploadJobs();
//        scheduleCleanJobs();
    }

    /**
     * 调度数据上传任务
     */
    public void scheduleUploadJobs() {
        log.info("启动数据上传任务调度器");
        scheduleUploadJobFromDb(uploadScheduler);
    }

    /**
     * 调度数据统计任务
     */
    public void scheduleCalculateJobs() {
        log.info("启动数据统计任务调度器");
        scheduleCalculateJobFromDb(calculateScheduler);
    }

    /**
     * 调度数据清理任务
     */
    public void scheduleCleanJobs() {
        log.info("启动数据清理任务调度器");
        scheduleCleanJobFromDb(cleanScheduler);
    }

    /**
     * 调度定时开关机任务
     */
    public void scheduleTimingJobs() {
        log.info("启动开关机定时任务调度器");
        scheduleTimingJobFromDb(timingScheduler);
    }

    /**
     * 调度数据库数据上传任务
     */
    private void scheduleUploadJobFromDb(Scheduler scheduler) {
        List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_UPLOAD);
        log.info("当前数据上传任务数:{}", tasks.size());
        tasks.stream().filter(task ->
                task.getStatus() == 0
        ).forEach(task -> {
            try {
                schedulerJob(task, scheduler);
            } catch (Exception e) {
                log.error("数据上传任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
            }
        });
    }

    /**
     * 调度数据库数据统计任务
     */
    private void scheduleCalculateJobFromDb(Scheduler scheduler) {
        List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_CALCULATE);
        log.info("当前数据统计任务数:{}", tasks.size());
        tasks.stream().filter(task ->
                task.getStatus() == 0
        ).forEach(task -> {
            try {
                schedulerJob(task, scheduler);
            } catch (Exception e) {
                log.error("数据统计任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
            }
        });
    }

    /**
     * 调度数据库数据清理任务
     */
    private void scheduleCleanJobFromDb(Scheduler scheduler) {
        List<QuartzTask> tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_CLEAN);
        log.info("当前数据清理任务数:{}", tasks.size());
        tasks.stream().filter(task ->
                task.getStatus() == 0
        ).forEach(task -> {
            try {
                schedulerJob(task, scheduler);
            } catch (Exception e) {
                log.error("数据清理任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
            }
        });
    }

    /**
     * 调度数据库定时开关机任务
     */
    private void scheduleTimingJobFromDb(Scheduler scheduler) {
        List<QuartzTask> tasks = Collections.emptyList();
        // 查询 定时开关机组 任务
        tasks = quartzService.selectJobsByGroup(JobConstant.JOB_GROUP_TIMING);
        log.info("当前定时开关机任务数: {}", tasks.size());
        // 把所有定时任务注入调度器中
        tasks.forEach(task -> {
            try {
                schedulerJob(task, scheduler);
            } catch (Exception e) {
                if (e.getMessage().contains("will never fire")) {
                    // 修改任务状态为已过期
                    quartzService.updateStatus(task.getId(), JobStatusEnums.ALREADY_EXECUTE.getStatus());
                    log.warn("定时任务[{}]已过期失效", task.getJobName());
                } else {
                    log.error("定时任务[{}]调度异常;错误:[{}]", task.getJobName(), e.getMessage());
                }
            }
        });
    }

    /**
     * @param schedulerName 需要重载的调度器名称
     * @description 重载指定调度器任务
     * @author echoo华地
     * @date 2022/5/31 10:00
     */
    public void rescheduleJobs(String schedulerName) {
        log.info("正在重载[{}]调度器任务", schedulerName);
        try {
            switch (schedulerName) {
                case JobConstant.JOB_EXECUTOR_TIMING:
                    if (clearSchedulerJobs(timingScheduler))
                        scheduleTimingJobFromDb(timingScheduler);
                    break;
                case JobConstant.JOB_EXECUTOR_UPLOAD:
                    if (clearSchedulerJobs(uploadScheduler))
                        scheduleUploadJobFromDb(uploadScheduler);
                    break;
                case JobConstant.JOB_EXECUTOR_CALCULATE:
                    if (clearSchedulerJobs(calculateScheduler))
                        scheduleCalculateJobFromDb(calculateScheduler);
                    break;
                case JobConstant.JOB_EXECUTOR_CLEAN:
                    if (clearSchedulerJobs(calculateScheduler))
                        scheduleCleanJobFromDb(calculateScheduler);
                    break;
                default:
                    log.error("重载调度器任务失败;错误[{}调度器不存在]", schedulerName);
            }
        } catch (Exception e) {
            log.error("重载定时任务失败;错误:[{}]", e.getMessage());
        }
    }


    /**
     * 清空定时开关机任务
     */
    private boolean clearSchedulerJobs(Scheduler scheduler) {
        try {
            log.info("清空[{}]调度器所有任务", scheduler.getSchedulerName());
            scheduler.clear();
            return true;
        } catch (SchedulerException e) {
            log.error("清空调度器任务异常;错误[{}]", e.getMessage());
            return false;
        }
    }

    /**
     * @param task      单个任务
     * @param scheduler 调度器
     * @description 调度器添加调度任务
     * @author echoo华地
     * @date 2022/5/30 17:12
     */
    private void schedulerJob(QuartzTask task, Scheduler scheduler) throws SchedulerException {
        JobKey jobKey = new JobKey(task.getJobName(), task.getJobGroup()); // 任务唯一标识
        Integer type = task.getType();
        Class<? extends Job> jobClass = getJobClass(type);
        if ((jobClass == null) || (!Job.class.isAssignableFrom(jobClass))) {
            log.error("调度失败,任务类型错误!");
        }
        JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobKey) // 任务类型,任务数据
                .usingJobData(JobConstant.JOB_DATA_JOB_ID, task.getId())
                .usingJobData(JobConstant.JOB_DATA_JOB_TYPE, task.getType())
                .usingJobData(JobConstant.JOB_DATA_JOB_NAME, task.getJobName()).build();
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCron());
        // 以任务名为名称,并根据cron表达式创建触发器
        CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(task.getJobName(),
                task.getJobGroup()).withSchedule(scheduleBuilder).build();
        if (scheduler.checkExists(jobKey)) {
            // 任务已在调度器内,先删除
            scheduler.deleteJob(jobKey);
        }
        // 加入调度
        scheduler.scheduleJob(jobDetail, cronTrigger);
    }

    private Class<? extends Job> getJobClass(Integer jobType) {
        Class<?> jobClass = JobTypeEnums.getJobClassByType(jobType);
        if (Job.class.isAssignableFrom(jobClass)) {
            return (Class<Job>) jobClass;
        } else {
            return null;
        }
    }
}

版权声明:本文为qq_31856061原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。