本文写一下java创建线程的一个核心类ThreadPoolExecutor的基本信息
在java的几种默认线程池之中,除了java8新增的任务窃取线程池newWorkStealingPool的实现不一样外,其他的几种无非都是对于ThreadPoolExecutor进行不同的初始化来实现的。
先看下ThreadPoolExecutor之中的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//此处方法的具体内容省略
}可以看到总共有七个参数,其中意思分别为:
corePoolSize : 核心线程数
maximumPoolSize:最大线程数
keepAliveTime:当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间
unit:存活时间keepAliveTime的单位
workQueue:用于在任务执行前存放任务的队列
threadFactory:创建新线程时要使用的工厂
handler:线程数和任务队列都满了以后,执行的拒绝策略
以上就是该类构造方法的七个参数。
对于任务队列workQueue:常见的有三种
1、SynchronousQueue:Direct handoffs(直接交换) 不以任何方式持有任务,直接提交给线程。该种队列通常需要需要无限的MaximumPoolSize,以避免拒绝新提交的任务。
比如已经实现了的newCachedThreadPool()线程池。
2、LinkedBlockingQueue:Unbounded queues(基于链表结构的无界队列)使用该队列的时候,当线程的核心线程数corePoolSize一直在繁忙的时候,可能导致新任务一直在任务队列之中进行等待。因此,就不会有大于corePoolSize的线程个数被创建,也就是说最大线程数maximumPoolSize的设置就是无效的。
比如已经实现的固定长度线程池newFixedThreadPool就是此种实现方式。
3、ArrayBlockingQueue:Bounded queues(基于数组结构的有界队列)与固定的最大线程数maximumPoolSizes一起使用时有助于防止资源耗尽,但可能更难优化和控制。队列的大小和最大线程数的大小可能会相互制衡。使用大的队列和小的核心线程数的时候,可以最大程度的减少cpu的使用、操作系统的资源和上下文切换的资源开销,但是也可能会人为的导致吞吐量的降低。使用小队列的话就需要更大的池。这会使得cpu更加的繁忙。可能会导致不可接受的调度开销,也会降低吞吐量。
对于拒绝策略handler,默认的有四种:
1、AbortPolicy:默认的策略,直接抛出RejectedExecutionException异常。源码如下所示:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}2、CallerRunsPolicy:调用自身线程来运行该任务。提供了一种简单的反馈控制机制,可以降低新任务的提交频率。源码如下所示:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}3、DiscardPolicy:直接丢弃。源码如下:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}4、DiscardOldestPolicy:丢弃队列之中最老的一个,就是即将被执行的任务。然后重试提交执行当前任务。(可能会再次失败导致重复执行)。源码如下:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}备注: 也可以自己实现RejectedExecutionHandler接口,重写其中的rejectedExecution方法来自定义响应的拒绝策略。
大致的执行流程为:当有新的任务进来,线程数还未达到核心线程数corePoolSize 的时候,直接创建新的线程来执行任务。当达到核心线程数以后,首先会往任务队列workQueue里面存放任务。当任务队列满了以后,在还未达到最大线程数maximumPoolSize的时候,就会继续创建新的线程来执行任务。在达到最大线程数以后,如果还继续有新的任务进来,就会执行对应的拒绝策略handler。
先看一下基本的使用方式:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
5,
60,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 线程数
int threads = 10;
// 用于计数线程是否执行完成
CountDownLatch countDownLatch = new CountDownLatch(threads);
System.out.println("---- start ----");
for (int i = 0; i < threads; i++) {
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("---- end ----");
}执行结果为:

然后我们深入源码看一下具体的execute执行过程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果运行的线程少于corePoolSize,则尝试启动一个新线程并作为其第一个任务。
* 它调用addWorker的原子方式检查runState 和workerCount,
* 通过返回false,从而防止在不应该的情况下可能添加的错误警报,
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果一个任务被添加到队列之中,我们仍然需要双重验证是否需要添加一个线程。
* (因为自从上次检查以后,是否有线程已经死亡)或者自从进入此方法以后线程池是否关闭
* 所以需要重新检查,必要的时候在停止时回滚排队,或者没有工作线程的时候启动新线程,
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 若我们不能添加任务,那我们将尝试添加一个新线程。
* 如果失败了,我们知道我们已经线程池被关闭或饱和,因此拒绝该任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}可以看到上述方法之中的核心方法是addWorker。
下面看一下addWorker的源码:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程是否是>=SHUTDOWN 的状态,也就是RUNNING的状态的时候,就直接跳过,
// 线程为大于SHUTDOWN的状态的时候,直接return false
// 线程是SHUTDOWN的状态,并且firstTask不等于null的时候直接拒绝。
// 检查队列是否为空,队列为空的时候,不需要增加线程,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 判断线程池是否已满。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增加工作线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建新的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//开始执行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}线程创建完成以后,就是执行Worker的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程状态是大于stop状态,则中断次线程
// 主要是确保当前线程不是stop状态,和当前线程没有被中断,
// 如果是的话,则中断次线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//前置钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
//开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//后置钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}本品文章先写到这里,如有问题,请各位大佬多多指教。