Java线程池工作原理,以及代码实现:
Thread线程的生命周期
(new Thread()新建)-> (start()就绪) 切换到-> (run()运行) -> (sleep()阻塞) 或 (执行完毕 死亡)
使用线程池的优点
1、提高效率:
频繁的开启new Thread()线程或者停止线程,线程需要从新被cpu从 (start()就绪) 切换到-> (run()运行)调度,需要发生CPU的上下文切换,效率非常低。
2、降低资源消耗:
假设高并发的场景 存在N个请求,并经历N次 线程生命周期 每次new Thread()线程,都会单独创建一个线程,如果http接口被恶意攻击,无限去请求,就会出现CPU飙高,导致服务宕机;通过池化技术重复利用已创建好的线程,从而降低线程创建和销毁造成的损耗 实现复用。
3、提高响应速度:
任务到达时,无需等待线程创建 即可立即复用执行。
4、提高线程的可管理性:
线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因
为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分
配、调优和监控。
线程池的创建方式
JDK自带的4种线程池创建:
Executors.newCachedThreadPool() 可缓存复用线程池
Executors.newFixedThreadPool() 可限制最大线程数
Executors.newScheduledThreadPool() 可定时线程池
Executors.newSingleThreadExecutor() 单例线程池
阿里巴巴不推荐使用JDK自带的线程池创建,因为它们的底层都是基于ThreadPoolExecutor 构造函数封装,
它们的底层是由一个无界的队列缓存实现,有可能会发生线程池溢出问题。
new LinkedBlockingDeque<Runnable>()
线程池创建的线程会一直在运行状态吗?
不会
例如:配置核心线程数 corePoolSize 为 2 、最大线程数 maximumPoolSize 为 5
我们可以通过配置超出 corePoolSize 核心线程数后创建的线程的存活时间例如为 60s
在 60s 内没有核心线程一直没有任务执行,则会停止该线程
线程池队列满了,任务会丢失吗?
如果队列满了,且任务总数>最大线程数则当前线程走拒绝策略。
可以自定义异拒绝异常将该任务缓存到(redis,本地文件,mysql)中后期项目启动实现补偿。
任务拒绝策略:
1.AbortPolicy 丢弃任务,抛运行时异常
2.CallerRunsPolicy 执行任务
3.DiscardPolicy 忽视,什么都不会发生
4.DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
5.实现 RejectedExecutionHandler 接口,可自定义处理器
手写Java线程池Demo
package com.details;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: Owen
* @Date: 2022/7/20
* @Description:线程池(简易版)
*/
@Slf4j
public class ThreadPoolDemo implements Executor {
//线程池初始化大小
private volatile int corePoolSize;
//线程池最大容量
private volatile int maximumPoolSize;
//线程并发队列
private final BlockingQueue<Runnable> workQueue;
//初始值0,保证对数字的操作是线程安全
private final AtomicInteger ctl = new AtomicInteger(0);
//构造方法
public ThreadPoolDemo(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
}
/**
* @Author: Owen
* @Date: 2022/7/20
* @Description:执行线程任务
*/
@Override
public void execute(Runnable task) {
//获取当前 线程状态(正在执行的线程任务)
int taskNumber = ctl.get();
//创建线程任务 条件:线程池中(正在执行的线程任务 小于 初始化线程池容量)
if (taskNumber < corePoolSize) {
//创建 线程任务
boolean result = addWorker(task);
if (!result) {
throw new RuntimeException("Runnable task add faild !");
}
return;
}
//线程池中(正在执行的线程任务 大于或等于 初始化线程池大小)
else {
//场景1:任务添加到 线程池并发队列中 等待执行
boolean addResult = workQueue.offer(task);
//场景2:如果线程池队列已满 则 继续创建 新的线程任务
if (!addResult) {
//创建 线程监听任务
boolean result = addWorker(task);
if (!result) {
throw new RuntimeException("Runnable task add queue faild !");
}
}
}
}
/**
* @Author: Owen
* @Date: 2022/7/20
* @Description:创建线程任务
*/
private boolean addWorker(Runnable eventTask) {
//获取当前 线程状态(正在执行的线程任务)
int taskNumber = ctl.get();
//(正在执行的线程任务 大于或等于 线程池最大容量)
if (taskNumber >= maximumPoolSize) {
//添加失败 线程池容量已满
return false;
}
//异步 创建新的线程任务,并执行当前eventTask任务
Worker worker = new Worker(eventTask);
//设置线程名称
worker.setName("Thread_poll_" + taskNumber);
//把当前线程置为, start()就绪状态
worker.start();
//创建 线程任务成功后 线程池状态自增+1
ctl.incrementAndGet();
log.info("Current thread task increment success: Current thread pool size: " + ctl.get() + ", Max thread pool size:" + maximumPoolSize);
return true;
}
/**
* @Author: Owen
* @Date: 2022/7/20
* @Description:线程监听任务
*/
private final class Worker extends Thread {
//当前 待执行的 线程任务
Runnable task;
//带参构造函数
public Worker(Runnable task) {
//创建 异步监听,并执行当前 线程任务
this.task = task;
}
//线程监听任务run()运行事件
@Override
public void run() {
//当前待执行的 线程任务
Runnable executeTask = task;
try {
//线程 监听任务
//(条件1: 存在待执行的任务) 或者 (并发队列中存在 待执行的线程任务)
while (Objects.nonNull(executeTask) || Objects.nonNull(executeTask = getTask())) {
//当前 待执行任务线程, 状态设置为run()运行状态
executeTask.run();
//取消当前任务
executeTask = null;
//当前执行的 线程监听任务 大于 线程池最大容量
if (ctl.get() > maximumPoolSize) {
//取消当前 线程监听任务
break;
}
}
}
//当满足 条件:(待执行任务为空 && 队列中任务也为空)
finally {
//线程池状态自减-1,释放当前 线程监听任务
ctl.decrementAndGet();
log.error("Current thread task decrement success: Current thread pool size: " + ctl.get() + ", Max thread pool size:" + maximumPoolSize);
}
}
/**
* @Author: Owen
* @Date: 2022/7/20
* @Description:获取当前线程队列元素
*/
private Runnable getTask() {
log.info("Current thread queue size:" + workQueue.size());
return workQueue.poll();
}
}
public static void main(String[] args) {
ThreadPoolDemo threadPoolTrader = new ThreadPoolDemo(2, 10, new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolTrader.execute(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程池测试:" + Thread.currentThread().getName() + " === 任务:" + finalI);
});
}
}
}
执行结果:
Java线程池 详细代码实现:
Github地址: https://github.com/a8896230/Java.git