SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了

private TaskScheduler myThreadPoolTaskScheduler;

@Override

public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {

//简单粗暴的方式直接指定

//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));

//也可以自定义的线程池,方便线程的使用与维护,这里不多说了

scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);

}

}

@Bean(name = “myThreadPoolTaskScheduler”)

public TaskScheduler getMyThreadPoolTaskScheduler() {

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();

taskScheduler.setPoolSize(10);

taskScheduler.setThreadNamePrefix(“Haina-Scheduled-”);

taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//调度器shutdown被调用时等待当前被调度的任务完成

taskScheduler.setWaitForTasksToCompleteOnShutdown(true);

//等待时长

taskScheduler.setAwaitTerminationSeconds(60);

return taskScheduler;

}

方式二:方式一的本质改变了任务调度器默认使用的线程池,接下来这种是不改变调度器的默认线程池,而是把当前任务交给一个异步线程池去执行

  • 首先使用@EnableAsync 启用异步任务

  • 然后在定时任务的方法加上@Async即可,默认使用的线程池为SimpleAsyncTaskExecutor(该线程池默认来一个任务创建一个线程,就会不断创建大量线程,极有可能压爆服务器内存。当然它有自己的限流机制,这里就不多说了,有兴趣的自己翻翻源码~)

  • 项目中为了更好的控制线程的使用,我们可以自定义我们自己的线程池,使用方式@Async(“myThreadPool”)

废话太多,直接上代码:

@Scheduled(fixedRate = 100010,initialDelay = 100020)

@Async(“myThreadPoolTaskExecutor”)

//@Async

public void scheduledTest02(){

System.out.println(Thread.currentThread().getName()+"—>xxxxx—>"+Thread.currentThread().getId());

}

//自定义线程池

@Bean(name = “myThreadPoolTaskExecutor”)

public TaskExecutor getMyThreadPoolTaskExecutor() {

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

taskExecutor.setCorePoolSize(20);

taskExecutor.setMaxPoolSize(200);

taskExecutor.setQueueCapacity(25);

taskExecutor.setKeepAliveSeconds(200);

taskExecutor.setThreadNamePrefix(“Haina-ThreadPool-”);

// 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者

taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//调度器shutdown被调用时等待当前被调度的任务完成

taskExecutor.setWaitForTasksToCompleteOnShutdown(true);

//等待时长

taskExecutor.setAwaitTerminationSeconds(60);

taskExecutor.initialize();

return taskExecutor;

}

线程池的使用心得

  • java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,对应与spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基础上增加了新的特性,在spring环境下更容易使用和控制。

  • 使用自定义的线程池能够避免一些默认线程池造成的内存溢出、阻塞等等问题,更贴合自己的服务特性

  • 使用自定义的线程池便于对项目中线程的管理、维护以及监控。

  • 即便在非spring环境下也不要使用java默认提供的那几种线程池,坑很多,阿里代码规约不说了吗,得相信大厂!!!

三、动态定时任务的实现

===========

问题

使用@Scheduled注解来完成设置定时任务,但是有时候我们往往需要对周期性的时间的设置会做

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

一些改变,或者要动态的启停一个定时任务,那么这个时候使用此注解就不太方便了,原因在于这个注解中配置的cron表达式必须是常量,那么当我们修改定时参数的时候,就需要停止服务,重新部署。

解决办法

方式一:实现SchedulingConfigurer接口,重写configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不过需要注意的是,此种方式修改了配置值后,需要在下一次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新需要在修改配置值时额外增加相关逻辑处理。

@Configuration

public class ScheduledConfig implements SchedulingConfigurer {

@Autowired

private TaskScheduler myThreadPoolTaskScheduler;

@Override

public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {

//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));

scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);

//可以实现动态调整定时任务的执行频率

scheduledTaskRegistrar.addTriggerTask(

//1.添加任务内容(Runnable)

() -> System.out.println(“cccccccccccccccc—>” + Thread.currentThread().getId()),

//2.设置执行周期(Trigger)

triggerContext -> {

//2.1 从数据库动态获取执行周期

String cron = "0/2 * * * * ? ";

//2.2 合法性校验.

// if (StringUtils.isEmpty(cron)) {

// // Omitted Code …

// }

//2.3 返回执行周期(Date)

return new CronTrigger(cron).nextExecutionTime(triggerContext);

}

);

}

}

方式二:使用threadPoolTaskScheduler类可实现动态添加删除功能,当然也可实现执行频率的调整

首先,我们要认识下这个调度类,它其实是对java中ScheduledThreadPoolExecutor的一个封装改进后的产物,主要改进有以下几点:

  • 提供默认配置,因为是ScheduledThreadPoolExecutor,所以只有poolSize这一个默认参数。

  • 支持自定义任务,通过传入Trigger参数。

  • 对任务出错处理进行优化,如果是重复性的任务,不抛出异常,通过日志记录下来,不影响下次运行,如果是只执行一次的任务,将异常往上抛。

顺便说下ThreadPoolTaskExecutor相对于ThreadPoolExecutor的改进点:

  • 提供默认配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他没有默认配置

  • 实现AsyncListenableTaskExecutor接口,支持对FutureTask添加success和fail的回调,任务成功或失败的时候回执行对应回调方法。

  • 因为是spring的工具类,所以抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个无所谓)

  • 提供默认ThreadFactory实现,直接通过参数重载配置

扯了这么多,还是直接上代码:

@Component

public class DynamicTimedTask {

private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);

//利用创建好的调度类统一管理

//@Autowired

//@Qualifier(“myThreadPoolTaskScheduler”)

//private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;

//接受任务的返回结果

private ScheduledFuture<?> future;

@Autowired

private ThreadPoolTaskScheduler threadPoolTaskScheduler;

//实例化一个线程池任务调度类,可以使用自定义的ThreadPoolTaskScheduler

@Bean

public ThreadPoolTaskScheduler threadPoolTaskScheduler() {

ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();

return new ThreadPoolTaskScheduler();

}

/**

  • 启动定时任务

  • @return

*/

public boolean startCron() {

boolean flag = false;

//从数据库动态获取执行周期

String cron = "0/2 * * * * ? ";

future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);

if (future!=null){

flag = true;

logger.info(“定时check训练模型文件,任务启动成功!!!”);

}else {

logger.info(“定时check训练模型文件,任务启动失败!!!”);

}

return flag;

}

/**

  • 停止定时任务

  • @return

*/

public boolean stopCron() {

boolean flag = false;

if (future != null) {

boolean cancel = future.cancel(true);

if (cancel){

flag = true;

logger.info(“定时check训练模型文件,任务停止成功!!!”);

}else {

logger.info(“定时check训练模型文件,任务停止失败!!!”);

}

}else {

flag = true;

logger.info(“定时check训练模型文件,任务已经停止!!!”);

}

return flag;

}

class CheckModelFile implements Runnable{

@Override

public void run() {

//编写你自己的业务逻辑

System.out.print(“模型文件检查完毕!!!”)

}

}


版权声明:本文为m0_64867003原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。