需求
在执行的时候需要同步用户信息,修改用户状态,同步信息的时候需要查询用户之前的记录并同步到ES中
问题
1、在报名的时候发现因为用户之前的记录比较多,会执行的比较慢,这样会一直报名中
2、用户在多次点击或者网络环境不好的情况下多次请求导致重复计算
解决
除了主线报名之外其他的计算、同步过程都异步执行,或者使用定时任务定时扫描
1、定时任务定时扫描,需要将信息压入队列,然后定期执行,消耗队列使用线程池
技术点
使用线程池执行队列:springboot线程池执行类ThreadPoolExecutor
在springboot文档中指出,没有配置线程池的话,springboot会自动配置一个ThreadPoolExecutor到Bean中
1、首先引入线程池配置类
先上代码:
package com.lz.common.config.thread;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @description: 线程池配置 多个默认配置:当配置文件不存在的时候使用默认值
* @author: jusz
* @create: 2021/12/14 21:53
*/
@Component
@ConfigurationProperties("threadpool")
@Data
public class ThreadPoolProperties {
/**
* 线程池实例1 :当有其他配置的时候新new一个配置设置默认参数
*/
private ThreadPoolInstance poolInstance = new ThreadPoolInstance(){
{
this.setCorePoolSize(5);
this.setMaxNumPoolSize(5);
this.setKeepAliveSeconds(60);
this.setWorkQueueCapacity(1000);
}
};
@Data
public class ThreadPoolInstance{
/**
* 核心线程数:
* 1、如果说线程数少于corePoolSize ,那么就会创建新的线程来执行任务,即便是线程池中其他的线程处于空闲状态
* 2、如果线程数目大于corePoolSize ,小于maxNumPoolSize ,那么只有当workQueue满的时候才会创建新的线程处理任务
* 3、如果corePoolSize 和 maxNumPoolSize 相等的时候,那么创建的线程就是固定线程数的线程池,只有workQueue满的时候才会创建新的线程处理任务
* 4、如果线程的数目大于maxNumPoolSize,这个时候workQueue满了,那么通过Thread指定的策略来处理任务
*/
private Integer corePoolSize;
/**
* 最大线程数
*/
private Integer maxNumPoolSize;
/**
* 队列数
*/
private Integer workQueueCapacity;
/**
* 线程空闲时间:当线程数大于corePoolSize的时候,如果没有新的任务提交,核心线程外的线程不会立即销毁,一直等到keepAliveTime
*/
private Integer keepAliveSeconds;
}
}
1、@Component :标识一个bean 2、@ConfigurationProperties("threadpool"):读取配置文件 3、@Data :lamada
使用这种方式不适应@Value方式是因为1、可以设置默认配置,2、如果配置文件中没有这个配置不会报错,3、可以设置多个默认配置
使用方式
@Autowired(required = false) //spring在启动的时候会注入bean 但是在扫描的时候没有发现这个bean,强行注入就会失败,使用这个是存在bean就注入
private ThreadPoolProperties threadPoolProperties;
...
Integer corePoolSize = threadPoolProperties.getPoolInstance().getCorePoolSize()
2、编写基本ThreadPoolExecutor调用类
package com.lz.common.config.thread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @description: 线程池配置
* 1、线程池的拒绝策略 :
* 1> AborPolicy :直接抛出异常,这是默认策略
* 2> CallerRunsPolicy :用调用者所在的线程来执行任务
* 3> DisCardOldsPolicy :丢弃阻塞队列中最考前的任务,执行当前任务
* 4> DiscardPolicy :直接丢弃任务
* 2、调用方式 :
* 第一种:
* 1> @Autowrite注入 ThreadPoolTaskExecutor xxxx线程池
* 2> xxxx线程池.execute(()->{
* 执行的内容
* });
* 第二种:
* 1> @Async("xxxx线程池")
* 使用
* @author: jusz
* @create: 2021/12/14 21:44
*/
@Configuration
@EnableAsync
@Slf4j
public class TaskExecuterConfiguration {
@Autowired(required = false) //spring在启动的时候会注入bean 但是在扫描的时候没有发现这个bean,强行注入就会失败,使用这个是存在bean就注入
private ThreadPoolProperties threadPoolProperties;
/**
* 设置线程池 :CallerRunsPolicy 使用当前调用者所在的线程来执行任务:
* 当线程数目达到最大线程数(maxNumPoolSize)的时候执行拒绝策略,表现为当前线程阻塞,知道当前调用者所在的线程执行完毕
* @return
*/
@Bean
@Primary //设置默认
public ThreadPoolTaskExecutor callRunsExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置线程池参数
executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
//配置线程池中线程名称前缀
executor.setThreadNamePrefix("callRunsExcutor-");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化
executor.initialize();
return executor;
}
/**
* 设置线程池 : AbortPolicy 抛弃旧的任务
* 当线程数目达到最大线程数(maxNumPoolSize)的时候执行拒绝策略,表现为直接抛出异常
* @return
*/
@Bean
public ThreadPoolTaskExecutor abortExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置线程池参数
executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
//配置线程池中线程名称前缀
executor.setThreadNamePrefix("abortExcutor-");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//初始化
executor.initialize();
return executor;
}
/**
* 设置线程池 : DisCardOldsPolicy 抛出java.util.concurrent.RejectedExecutionException异常
* 当线程数目达到最大线程数(maxNumPoolSize)的时候执行拒绝策略,表现为直接抛弃
* @return
*/
@Bean
public ThreadPoolTaskExecutor disCardOldsExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置线程池参数
executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
//配置线程池中线程名称前缀
executor.setThreadNamePrefix("disCardOldsExcutor-");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
//初始化
executor.initialize();
return executor;
}
/**
* 设置线程池 : DiscardPolicy 直接丢弃任务
* 当线程数目达到最大线程数(maxNumPoolSize)的时候执行拒绝策略,表现为直接任务丢失
* @return
*/
@Bean
public ThreadPoolTaskExecutor discardExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//设置线程池参数
executor.setCorePoolSize(threadPoolProperties.getPoolInstance().getCorePoolSize());
executor.setMaxPoolSize(threadPoolProperties.getPoolInstance().getMaxNumPoolSize());
executor.setKeepAliveSeconds(threadPoolProperties.getPoolInstance().getKeepAliveSeconds());
executor.setQueueCapacity(threadPoolProperties.getPoolInstance().getWorkQueueCapacity());
//配置线程池中线程名称前缀
executor.setThreadNamePrefix("discardExcutor-");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//初始化
executor.initialize();
return executor;
}
}
3、调用方式
1
@Resource(name = "userHoursExecutor")
private ThreadPoolTaskExecutor userHoursExecutor;
userHoursExecutor.execute(() -> {
//执行代码
});
2、在方法上或者类上执行异步
@Async("配置中的bean名称")
版权声明:本文为weixin_42167717原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。