记录一下动态生成定时任务和更新定时任务配置的开发经验
目录
1.实现定时任务配置接口,在此自定义定时任务创建策略,动态管理定时任务
前言
工作中有需要应用到定时任务的场景,如一天一次,一周一次,一月一次,一年一次,做日报,周报,月报,年报的统计,以及信息提醒等,spring boot 提供了一个两种方式实现定时任务。第一种是静态的创建——基于注解,第二种是自定义配置——基于接口。我的业务场景是通过配置监控时间段和监控频率来对服务监控,因此需要动态创建定时任务。
一、自定义定时任务配置类
1、保证ConcurrentTaskScheduler不使用默认单线程的ScheduledExecutor,而是corePoolSize=5的线程池
2、自定义线程池工厂类
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {
@Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledExecutorService scheduledAnnotationProcessor() {
return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
}
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-schedule-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
二、实现SchedulingConfigurer接口
1.实现定时任务配置接口,在此自定义定时任务创建策略,动态管理定时任务
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.assertj.core.util.Lists;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
public class DynamicTask implements SchedulingConfigurer {
/**
* corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
* keepAliveTime = 60s,线程空闲60s后自动结束。
* workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
*/
private static final ExecutorService es = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
private volatile ScheduledTaskRegistrar registrar;
private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();
private volatile List<TaskConstant> taskConstants = Lists.newArrayList();
@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {
this.registrar = registrar;
this.registrar.addTriggerTask(() -> {
if (!CollectionUtils.isEmpty(taskConstants)) {
log.info("");
log.info("检测动态定时任务列表...数量:{}", taskConstants.size());
List<TimingTask> tts = new ArrayList<>();
taskConstants
.forEach(taskConstant -> {
TimingTask tt = new TimingTask();
tt.setExpression(taskConstant.getCron());
tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());
tt.setRule(taskConstant.getRule());
tts.add(tt);
});
this.refreshTasks(tts);
}
}
, triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
}
public List<TaskConstant> getTaskConstants() {
return taskConstants;
}
private void refreshTasks(List<TimingTask> tasks) {
//取消已经删除的策略任务
Set<String> taskIds = scheduledFutures.keySet();
for (String taskId : taskIds) {
if (!exists(tasks, taskId)) {
scheduledFutures.get(taskId).cancel(false);
}
}
for (TimingTask tt : tasks) {
String expression = tt.getExpression();
if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
log.error("定时任务DynamicTask cron表达式不合法: " + expression);
continue;
}
//如果配置一致,则不需要重新创建定时任务
if (scheduledFutures.containsKey(tt.getTaskId())
&& cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {
continue;
}
//如果策略执行时间发生了变化,则取消当前策略的任务
if (scheduledFutures.containsKey(tt.getTaskId())) {
scheduledFutures.remove(tt.getTaskId()).cancel(false);
//反转有序集合,将先添加的定时任务删除
Collections.reverse(taskConstants);
for (int i = taskConstants.size() - 1; i >= 0; i--) {
if (("dynamic-task-" + taskConstants.get(i).getTaskId()).equals(tt.getTaskId())) {
log.info("移除的配置:{}", taskConstants.get(i).getRule());
taskConstants.remove(i);
break;
}
}
cronTasks.remove(tt.getTaskId());
}
CronTask task = new CronTask(tt, expression);
ScheduledFuture<?> future = Objects.requireNonNull(registrar.getScheduler()).schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(tt.getTaskId(), task);
assert future != null;
scheduledFutures.put(tt.getTaskId(), future);
}
}
private boolean exists(List<TimingTask> tasks, String taskId) {
for (TimingTask task : tasks) {
if (task.getTaskId().equals(taskId)) {
return true;
}
}
return false;
}
@PreDestroy
public void destroy() {
this.registrar.destroy();
}
public static class TaskConstant {
private String cron;
private String taskId;
private String rule;
public String getRule() {
return rule;
}
public void setRule(String rule) {
this.rule = rule;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
}
private static class TimingTask implements Runnable {
private String expression;
private String taskId;
private String rule;
public String getRule() {
return rule;
}
public void setRule(String rule) {
this.rule = rule;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public void run() {
es.submit(() -> {
//这里写业务方法
log.info("执行定时任务:{},执行时间:{},{} ", this.getTaskId(), LocalDateTime.now().toLocalTime(), this.getRule());
});
}
public String getExpression() {
return expression;
}
public void setExpression(String expression) {
this.expression = expression;
}
@Override
public String toString() {
return ReflectionToStringBuilder.toString(this
, ToStringStyle.JSON_STYLE
, false
, false
, TimingTask.class);
}
}
}
2.测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class DynamicTaskTest {
@Autowired
private DynamicTask dynamicTask;
@Test
public void test() throws InterruptedException {
List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
taskConstant.setCron("0/5 * * * * ?");
taskConstant.setTaskId("test1");
taskConstant.setRule("每隔5秒执行");
taskConstans.add(taskConstant);
DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
taskConstant1.setCron("0/8 * * * * ?");
taskConstant1.setTaskId("test2");
taskConstant1.setRule("每隔8秒执行");
taskConstans.add(taskConstant1);
DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
taskConstant2.setCron("0/15 * * * * ?");
taskConstant2.setTaskId("test3");
taskConstant2.setRule("每隔15秒执行");
taskConstans.add(taskConstant2);
TimeUnit.SECONDS.sleep(10);
//更新test1的定时任务配置
DynamicTask.TaskConstant taskConstant4 = new DynamicTask.TaskConstant();
taskConstant4.setCron("0/6 * * * * ?");
taskConstant4.setTaskId("test1");
taskConstant4.setRule("每隔6秒执行");
taskConstans.add(taskConstant4);
TimeUnit.SECONDS.sleep(20);
DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
taskConstant3.setCron("0/20 * * * * ?");
taskConstant3.setTaskId("test4");
taskConstant3.setRule("每隔20秒执行");
taskConstans.add(taskConstant3);
TimeUnit.MINUTES.sleep(50);
}
}3.运行截图

总结
以上就是今天要讲的内容,本文仅仅简单介绍了动态创建定时任务的步骤。
版权声明:本文为weixin_41955471原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。