Java线程池工作原理、手写线程池

Thread线程的生命周期

(new Thread()新建)-> (start()就绪) 切换到-> (run()运行) -> (sleep()阻塞) 或 (执行完毕 死亡)
![在这里插入图片描述](https://img-blog.csdnimg.cn/8d28910e196448c49ccdb1112b6cd235.png

使用线程池的优点

1、提高效率:
频繁的开启new Thread()线程或者停止线程,线程需要从新被cpu从 (start()就绪) 切换到-> (run()运行)调度,需要发生CPU的上下文切换,效率非常低。
2、降低资源消耗:
假设高并发的场景 存在N个请求,并经历N次 线程生命周期 每次new Thread()线程,都会单独创建一个线程,如果http接口被恶意攻击,无限去请求,就会出现CPU飙高,导致服务宕机;通过池化技术重复利用已创建好的线程,从而降低线程创建和销毁造成的损耗 实现复用。
3、提高响应速度:
任务到达时,无需等待线程创建 即可立即复用执行。
4、提高线程的可管理性:
线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因
为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分
配、调优和监控。

线程池的创建方式

JDK自带的4种线程池创建:
Executors.newCachedThreadPool() 可缓存复用线程池
Executors.newFixedThreadPool() 可限制最大线程数
Executors.newScheduledThreadPool() 可定时线程池
Executors.newSingleThreadExecutor() 单例线程池

阿里巴巴不推荐使用JDK自带的线程池创建,因为它们的底层都是基于ThreadPoolExecutor 构造函数封装,
它们的底层是由一个无界的队列缓存实现,有可能会发生线程池溢出问题。
new LinkedBlockingDeque<Runnable>() 

在这里插入图片描述

线程池创建的线程会一直在运行状态吗?

不会
例如:配置核心线程数 corePoolSize 为 2 、最大线程数 maximumPoolSize 为 5
我们可以通过配置超出 corePoolSize 核心线程数后创建的线程的存活时间例如为 60s
在 60s 内没有核心线程一直没有任务执行,则会停止该线程

线程池队列满了,任务会丢失吗?

如果队列满了,且任务总数>最大线程数则当前线程走拒绝策略。
可以自定义异拒绝异常将该任务缓存到(redis,本地文件,mysql)中后期项目启动实现补偿。

任务拒绝策略:
1.AbortPolicy 丢弃任务,抛运行时异常
2.CallerRunsPolicy 执行任务
3.DiscardPolicy 忽视,什么都不会发生
4.DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
5.实现 RejectedExecutionHandler 接口,可自定义处理器

手写Java线程池Demo

package com.details;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: Owen
 * @Date: 2022/7/20
 * @Description:线程池(简易版)
 */
@Slf4j
public class ThreadPoolDemo implements Executor {
    //线程池初始化大小
    private volatile int corePoolSize;
    //线程池最大容量
    private volatile int maximumPoolSize;
    //线程并发队列
    private final BlockingQueue<Runnable> workQueue;

    //初始值0,保证对数字的操作是线程安全
    private final AtomicInteger ctl = new AtomicInteger(0);

    //构造方法
    public ThreadPoolDemo(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
    }


    /**
     * @Author: Owen
     * @Date: 2022/7/20
     * @Description:执行线程任务
     */
    @Override
    public void execute(Runnable task) {
        //获取当前 线程状态(正在执行的线程任务)
        int taskNumber = ctl.get();

        //创建线程任务 条件:线程池中(正在执行的线程任务 小于 初始化线程池容量)
        if (taskNumber < corePoolSize) {
            //创建 线程任务
            boolean result = addWorker(task);
            if (!result) {
                throw new RuntimeException("Runnable task add faild !");
            }
            return;
        }
        //线程池中(正在执行的线程任务 大于或等于 初始化线程池大小)
        else {
            //场景1:任务添加到 线程池并发队列中 等待执行
            boolean addResult = workQueue.offer(task);
            //场景2:如果线程池队列已满 则 继续创建 新的线程任务
            if (!addResult) {
                //创建 线程监听任务
                boolean result = addWorker(task);
                if (!result) {
                    throw new RuntimeException("Runnable task add queue faild !");
                }
            }
        }
    }


    /**
     * @Author: Owen
     * @Date: 2022/7/20
     * @Description:创建线程任务
     */
    private boolean addWorker(Runnable eventTask) {
        //获取当前 线程状态(正在执行的线程任务)
        int taskNumber = ctl.get();

        //(正在执行的线程任务 大于或等于 线程池最大容量)
        if (taskNumber >= maximumPoolSize) {
            //添加失败 线程池容量已满
            return false;
        }

        //异步 创建新的线程任务,并执行当前eventTask任务
        Worker worker = new Worker(eventTask);
        //设置线程名称
        worker.setName("Thread_poll_" + taskNumber);
        //把当前线程置为, start()就绪状态
        worker.start();
        //创建 线程任务成功后 线程池状态自增+1
        ctl.incrementAndGet();
        log.info("Current thread task increment success: Current thread pool size: " + ctl.get() + ", Max thread pool size:" + maximumPoolSize);
        return true;
    }


    /**
     * @Author: Owen
     * @Date: 2022/7/20
     * @Description:线程监听任务
     */
    private final class Worker extends Thread {
        //当前 待执行的 线程任务
        Runnable task;

        //带参构造函数
        public Worker(Runnable task) {
            //创建 异步监听,并执行当前 线程任务
            this.task = task;
        }

        //线程监听任务run()运行事件
        @Override
        public void run() {
            //当前待执行的 线程任务
            Runnable executeTask = task;
            try {
                //线程 监听任务
                //(条件1: 存在待执行的任务) 或者 (并发队列中存在 待执行的线程任务)
                while (Objects.nonNull(executeTask) || Objects.nonNull(executeTask = getTask())) {
                    //当前 待执行任务线程, 状态设置为run()运行状态
                    executeTask.run();
                    //取消当前任务
                    executeTask = null;

                    //当前执行的 线程监听任务 大于 线程池最大容量
                    if (ctl.get() > maximumPoolSize) {
                        //取消当前 线程监听任务
                        break;
                    }
                }
            }
            //当满足 条件:(待执行任务为空 && 队列中任务也为空)
            finally {
                //线程池状态自减-1,释放当前 线程监听任务
                ctl.decrementAndGet();
                log.error("Current thread task decrement success: Current thread pool size: " + ctl.get() + ", Max thread pool size:" + maximumPoolSize);
            }
        }


        /**
         * @Author: Owen
         * @Date: 2022/7/20
         * @Description:获取当前线程队列元素
         */
        private Runnable getTask() {
            log.info("Current thread queue size:" + workQueue.size());
            return workQueue.poll();
        }
    }


    public static void main(String[] args) {
        ThreadPoolDemo threadPoolTrader = new ThreadPoolDemo(2, 10, new ArrayBlockingQueue<Runnable>(10));
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            threadPoolTrader.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("线程池测试:" + Thread.currentThread().getName() + " === 任务:" + finalI);
            });
        }
    }

}

执行结果:
在这里插入图片描述

Java线程池 详细代码实现:

Github地址: https://github.com/a8896230/Java.git


版权声明:本文为qq_39648078原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。