【Scheduled定时任务】spring boot动态生成定时任务

记录一下动态生成定时任务和更新定时任务配置的开发经验

目录

前言

一、自定义定时任务配置类

二、实现SchedulingConfigurer接口

1.实现定时任务配置接口,在此自定义定时任务创建策略,动态管理定时任务

2.测试类

3.运行截图

总结


前言

工作中有需要应用到定时任务的场景,如一天一次,一周一次,一月一次,一年一次,做日报,周报,月报,年报的统计,以及信息提醒等,spring boot 提供了一个两种方式实现定时任务。第一种是静态的创建——基于注解,第二种是自定义配置——基于接口。我的业务场景是通过配置监控时间段和监控频率来对服务监控,因此需要动态创建定时任务。


一、自定义定时任务配置类

        1、保证ConcurrentTaskScheduler不使用默认单线程的ScheduledExecutor,而是corePoolSize=5的线程池

2、自定义线程池工厂类

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {

    @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledExecutorService scheduledAnnotationProcessor() {
        return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
    }

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-schedule-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

二、实现SchedulingConfigurer接口

1.实现定时任务配置接口,在此自定义定时任务创建策略,动态管理定时任务

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.assertj.core.util.Lists;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@Slf4j
public class DynamicTask implements SchedulingConfigurer {

    /**
     * corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
     * keepAliveTime = 60s,线程空闲60s后自动结束。
     * workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
     */
    private static final ExecutorService es = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            0L, TimeUnit.MILLISECONDS,
            new SynchronousQueue<>());


    private volatile ScheduledTaskRegistrar registrar;
    private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();

    private volatile List<TaskConstant> taskConstants = Lists.newArrayList();

    @Override
    public void configureTasks(ScheduledTaskRegistrar registrar) {
        this.registrar = registrar;
        this.registrar.addTriggerTask(() -> {
                    if (!CollectionUtils.isEmpty(taskConstants)) {
                        log.info("");
                        log.info("检测动态定时任务列表...数量:{}", taskConstants.size());
                        List<TimingTask> tts = new ArrayList<>();
                        taskConstants
                                .forEach(taskConstant -> {
                                    TimingTask tt = new TimingTask();
                                    tt.setExpression(taskConstant.getCron());
                                    tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());
                                    tt.setRule(taskConstant.getRule());
                                    tts.add(tt);
                                });
                        this.refreshTasks(tts);
                    }
                }
                , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
    }


    public List<TaskConstant> getTaskConstants() {
        return taskConstants;
    }

    private void refreshTasks(List<TimingTask> tasks) {
        //取消已经删除的策略任务
        Set<String> taskIds = scheduledFutures.keySet();
        for (String taskId : taskIds) {
            if (!exists(tasks, taskId)) {
                scheduledFutures.get(taskId).cancel(false);
            }
        }
        for (TimingTask tt : tasks) {
            String expression = tt.getExpression();
            if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
                log.error("定时任务DynamicTask cron表达式不合法: " + expression);
                continue;
            }
            //如果配置一致,则不需要重新创建定时任务
            if (scheduledFutures.containsKey(tt.getTaskId())
                    && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {
                continue;
            }
            //如果策略执行时间发生了变化,则取消当前策略的任务
            if (scheduledFutures.containsKey(tt.getTaskId())) {
                scheduledFutures.remove(tt.getTaskId()).cancel(false);
                //反转有序集合,将先添加的定时任务删除
                Collections.reverse(taskConstants);
                for (int i = taskConstants.size() - 1; i >= 0; i--) {
                    if (("dynamic-task-" + taskConstants.get(i).getTaskId()).equals(tt.getTaskId())) {
                        log.info("移除的配置:{}", taskConstants.get(i).getRule());
                        taskConstants.remove(i);
                        break;
                    }
                }
                cronTasks.remove(tt.getTaskId());
            }
            CronTask task = new CronTask(tt, expression);
            ScheduledFuture<?> future = Objects.requireNonNull(registrar.getScheduler()).schedule(task.getRunnable(), task.getTrigger());
            cronTasks.put(tt.getTaskId(), task);
            assert future != null;
            scheduledFutures.put(tt.getTaskId(), future);
        }
    }

    private boolean exists(List<TimingTask> tasks, String taskId) {
        for (TimingTask task : tasks) {
            if (task.getTaskId().equals(taskId)) {
                return true;
            }
        }
        return false;
    }

    @PreDestroy
    public void destroy() {
        this.registrar.destroy();
    }

    public static class TaskConstant {
        private String cron;
        private String taskId;
        private String rule;

        public String getRule() {
            return rule;
        }

        public void setRule(String rule) {
            this.rule = rule;
        }

        public String getCron() {
            return cron;
        }

        public void setCron(String cron) {
            this.cron = cron;
        }

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }
    }

    private static class TimingTask implements Runnable {
        private String expression;

        private String taskId;

        private String rule;

        public String getRule() {
            return rule;
        }

        public void setRule(String rule) {
            this.rule = rule;
        }

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {

            es.submit(() -> {
                //这里写业务方法
                log.info("执行定时任务:{},执行时间:{},{} ", this.getTaskId(), LocalDateTime.now().toLocalTime(), this.getRule());

            });

        }

        public String getExpression() {
            return expression;
        }

        public void setExpression(String expression) {
            this.expression = expression;
        }

        @Override
        public String toString() {
            return ReflectionToStringBuilder.toString(this
                    , ToStringStyle.JSON_STYLE
                    , false
                    , false
                    , TimingTask.class);
        }

    }

}

2.测试类

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.List;
import java.util.concurrent.TimeUnit;

@SpringBootTest
class DynamicTaskTest {
    @Autowired
    private DynamicTask dynamicTask;

    @Test
    public void test() throws InterruptedException {
        List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
        DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
        taskConstant.setCron("0/5 * * * * ?");
        taskConstant.setTaskId("test1");
        taskConstant.setRule("每隔5秒执行");
        taskConstans.add(taskConstant);


        DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
        taskConstant1.setCron("0/8 * * * * ?");
        taskConstant1.setTaskId("test2");
        taskConstant1.setRule("每隔8秒执行");
        taskConstans.add(taskConstant1);

        DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
        taskConstant2.setCron("0/15 * * * * ?");
        taskConstant2.setTaskId("test3");
        taskConstant2.setRule("每隔15秒执行");
        taskConstans.add(taskConstant2);

        TimeUnit.SECONDS.sleep(10);
        //更新test1的定时任务配置
        DynamicTask.TaskConstant taskConstant4 = new DynamicTask.TaskConstant();
        taskConstant4.setCron("0/6 * * * * ?");
        taskConstant4.setTaskId("test1");
        taskConstant4.setRule("每隔6秒执行");
        taskConstans.add(taskConstant4);

        TimeUnit.SECONDS.sleep(20);
        DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
        taskConstant3.setCron("0/20 * * * * ?");
        taskConstant3.setTaskId("test4");
        taskConstant3.setRule("每隔20秒执行");
        taskConstans.add(taskConstant3);

        TimeUnit.MINUTES.sleep(50);
    }

}

3.运行截图


总结

以上就是今天要讲的内容,本文仅仅简单介绍了动态创建定时任务的步骤。


版权声明:本文为weixin_41955471原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。