简介:
FutureTask 是一个可取消的、可获得执行结果异步任务,通过线程池ExecutorService 来执行FutureTask。
FutureTask类 实现了RunnableFuture接口,而RunnableFuture 继承了Runnable和Future接口,所以本质上 FutureTask是Runnable的一种实现。
我们先来一张工作原理图:

总之:同一时间,N个线程争抢一个FutureTask,只有一个线程(比如A)获得任务执行权,其余线程什么也不执行。
某个时间节点可能有N个线程想要获取到线程A执行任务的结果,如果A确实执行完了则返回结果(无论结果好坏,有可能正常执行完了,有可能中间出现异常了),如果没执行完则进入等待队列,等待执行完成。
如果等待队列中的线程被中断了则直接退出等待队列,并抛出中断异常。
如果任务线程执行过程中被其他线程把任务取消了,则抛出取消异常。获取结果的线程将收到任务取消异常。
属性:
//当前任务线程的状态,下面的几个常量都是state不同状态下的值
private volatile int state;
//表示当前任务线程刚创建
private static final int NEW = 0;
//表示当前任务线程即将完成,是一个临界状态
private static final int COMPLETING = 1;
//表示当前任务线程正常结束
private static final int NORMAL = 2;
//表示当前任务线程出现异常
private static final int EXCEPTIONAL = 3;
//表示当前任务线程被取消了
private static final int CANCELLED = 4;
//表示当前线程已经被打了中断标记
private static final int INTERRUPTING = 5;
//表示当前线程已经被中断
private static final int INTERRUPTED = 6;
任务状态之间的转换:
1、NEW -> COMPLETING -> NORMAL:正常结束
2、NEW -> COMPLETING -> EXCEPTIONAL:异常结束
3、NEW -> CANCELLED:任务被取消
4、NEW -> INTERRUPTING -> INTERRUPTED:任务出现中断//其内部包装了一个 Callable 来执行任务,Callable 有返回值的线程
private Callable<V> callable;
//任务执行的返回结果,因为Callable 执行后的返回结果,
// 如果传入Runnable 传入什么返回什么详细见构造方法
private Object outcome; // non-volatile, protected by state reads/writes
//当前正在运行的线程
private volatile Thread runner;
//等待队列里的节点 WaitNode是FutureTask的内部类
private volatile WaitNode waiters;//等待队列里的节点类 我们可以看出等待队列是单向的,由头节点指向下一个节点
//有小伙伴可能问为什么需要等待队列?
//那是因为当一个任务线程执行任务之后,可能后面有N个线程等待任务执行的结果。
//比如你爸让你去超市给他买包烟,你妈让你稍一袋盐回来,你姐姐让你帮忙带一瓶可乐,你去超市了,那是不是你爸你妈你姐都在等着你的消息啊。所以需要等待队列还保存等待执行结果的线程。
static final class WaitNode {
//被阻塞的线程
volatile Thread thread;
//下一个节点
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}//UNSAFE 是底层C++实现的,具体可以自行搜素UNSAFE类,主要通过他的CAS方法实现线程安全
private static final sun.misc.Unsafe UNSAFE;
//状态属性在内存中的偏移量(可以理解为在内存中的位置)
private static final long stateOffset;
//当前线程在内存中的偏移量
private static final long runnerOffset;
//当前等待节点在内存中的偏移量
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}构造方法:
//传入一个Callable,并把线程状态设置为 NEW
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}//传入Runnable,传入返回结果,并设置为NEW
//其实这个构造方法构造的FutureTask就是执行了Runnable再把传入的值返回去,对result啥也没干
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
//这里是怎么把Runnable 转化为 Callable模式的呢?小伙伴是不是瞬间想到了什么?
//没错,就是适配器模式
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
// RunnableAdapter 实现了Callable,重写了call()
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
//call里边执行run()
task.run();
//所以构造FutureTask(runnable,result)的时候,直接把result返回
return result;
}
}方法:
//这是最主要的方法,因为我们一开始说FutureTask是Runable的一种实现,
//当FutureTask被线程池执行时,就会执行FutureTask的run();
public void run() {
//首先这里判断线程的状态是不是刚创建
//如果是,再通过 CAS 给当前运行线程赋值,赋值失败说明已经有线程赶在它前面执行了则直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//这个Callable 就是我们内部封装的业务逻辑了
Callable<V> c = callable;
//通过上面的CAS操作,和我们构造时设置线程状态为NEW,所以第一次会走进这个if中去
if (c != null && state == NEW) {
//返回值,当前为空
V result;
//线程是否正在执行,默认false
boolean ran;
try {
//开始执行并返回结果
result = c.call();
//设运行状态为true
ran = true;
} catch (Throwable ex) {
//try代码块执行过程中出现异常
result = null;
ran = false;
//设置异常信息 ?具体分析
setException(ex);
}
if (ran)
//正常运行 就存储执行结果,当其他线程调用get()时获取该结果 ?具体分析
set(result);
}
} finally {
//最终无论线程正确还是错误执行,把运行线程设置为空
runner = null;
int s = state;
if (s >= INTERRUPTING)
//如果在执行过程中线程被中断,比如被外面的线程执行了cancel()方法,则执该方法
handlePossibleCancellationInterrupt(s);
}
}
//设置异常
protected void setException(Throwable t) {
//设置当前线程即将完成状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//把异常信息作为返回值
outcome = t;
//设置状态为异常状态(完成的结果有好又坏 这里就是坏的完成)
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//最后要干嘛呢?猜一下
finishCompletion();
}
}//设置结果
protected void set(V v) {
//上面把结果改为正在完成
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//这里设置为已正常完成 (好的完成)
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}private void handlePossibleCancellationInterrupt(int s) {
//处于中断中一直让出CPU
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}//其实这个操作就是循环把队列里中等待节点里的所有线程都释放掉
//主要有三个地方调用:可以想一下?
/**
任务正确执行完成时
任务异常执行完成时
任务被取消的时候
*/
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//这里可能有小伙伴有疑问是什么时候park的呢?就是当我们get()获取结果的时候
//下面会具体分析
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//这个方法是没有任何操作的,有需要的同学可以对该方法进行扩展
done();
callable = null; // to reduce footprint
}//这个是仅次于run()的最核心的方法,我们知道我们是可以通过调用该方法取得线程执行结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果线程还没执行完 就进行阻塞,直到线程执行结束
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//执行完了,无论好结果还是坏结果都返回
return report(s);
}//获取结果时判断任务是否执行完成,未完成则需要等待
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//设置过期时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//等待节点
WaitNode q = null;
//线程是否已经进入等待队列
boolean queued = false;
for (;;) { //一直轮训
//如果当前获取结果的线程被标记了中断则退出等待队列并报错
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//任务线程执行完成,则获取结果的线程就可以拿到结果了,并把等待队列节点置为空
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果任务线程正在完成..
else if (s == COMPLETING) // cannot time out yet
//让当前真正获取结果的线程让出CPU
Thread.yield();
//表示当前等待队列里还没有节点 你是第一个就创建节点
else if (q == null)
q = new WaitNode();
//如果节点还没有入队,就让当前获取结果的线程入队
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//先判断是否设置过期时间
else if (timed) {
//判断是否过期
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//过期了就从等待队列中删除 并返回当前执行任务线程的状态
removeWaiter(q);
return state;
}
//没过期就给挂起
LockSupport.parkNanos(this, nanos);
}
//线程被挂起 啥时候释放来着?想不起来就回去翻翻看。
else
LockSupport.park(this);
}
}//返回结果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
//如果任务线程状态被取消或者状态是中断中 / 已中断 则返回取消任务执行异常
if (s >= CANCELLED)
throw new CancellationException();
//正常运行完成出现异常则返回异常结果
throw new ExecutionException((Throwable)x);
}//通过参数名也可以知道,这个参数是:任务线程在运行中是不是能够中断
public boolean cancel(boolean mayInterruptIfRunning) {
//先判断当前线程的状态 如果不是刚创建那就不能被取消
//如果是创建状态:当mayInterruptIfRunning为true:设置中断标记状态失败则不能被取消
//如果是创建状态:当mayInterruptIfRunning为false:设置取消状态失败(已经被别的线程抢先取消了则也是失败)
//注意这里只是把线程状态改了,真正的任务线程还没有中断
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
//可以为任务线程设置中断标记
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
//给执行任务线程设置中断,如果执行到这里,那任务线程就一直处于执行状态,停不下来了
t.interrupt();
} finally { // final state
//最后把任务线程状态设置为已中断状态,但是任务线程还是处于线程中断中
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//到这里说明上面的代码把任务线程已经设置为已取消 / 已中断状态
//所以这里就要把所有等待获取结果的线程释放掉
finishCompletion();
}
return true;
}//当然还有这几个很简单的方法 就不赘述了
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}总结:
1、通过CAS 来控制线程竞争
2、通过任务线程即FutureTask内部Callable线程的状态流转查看任务的执行情况
3、获取结果的线程,任务未执行完会进入等待队列,正常执行/异常执行都会返回执行结果。
其实总体还说FutureTask还是比较简单和好理解的,感谢大家的观看,有什么问题欢迎留言沟通。
后面会持续更新多优质好文,点关注不迷路。? 同时小弟公众号?,感谢大家的支持。

版权声明:本文为weixin_42064942原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。