Future是java1.5中引入的接口,由大神Doug Lea操刀,作用是我这边忙着工作,开个线程帮我干活,我等着你干完。生活中的例子就是订外卖。
Future的使用
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Callable rentStore = new Callable() {
public Object call() throws Exception {
Thread.sleep(1000);
return "老公:我找到了合适的店面";
}
};
Callable buyTableware = new Callable() {
public Object call() throws Exception {
Thread.sleep(1000);
return "老婆:我找到了爽快的供应商";
}
};
FutureTask<String> rentStoreTask = new FutureTask<String>(rentStore);
FutureTask<String> buyTablewareTask = new FutureTask<String>(buyTableware);
System.out.println("家庭会议中...");
System.out.println("老婆:我觉得现在的工作索然无味,我们开个花店吧");
System.out.println("老公:嗯,你这个主意不错,咱们分头行动,我去找店面,你去联系供应商");
System.out.println("老婆:没问题,瞧好吧你");
rentStoreTask.run();
buyTablewareTask.run();
System.out.println("积极筹备中...");
System.out.println(rentStoreTask.get());
System.out.println(buyTablewareTask.get());
}
}这个例子中涉及了几个类:FutureTask、Callable,接下来我们来一一探讨。
Callable是什么?
Callable是启动线程的一种方式。首先Callable是一个接口,只有一个call方法,方法没有参数,但有个返回值。
Callable跟Runnable功能类似,但功能比Runnable功能更强大。因为Runnable无法返回值,而且Callable可以抛出异常Runnable不行。
Callable需要FutureTask配合运行。
从FutureTask源码分析Future是怎么运行的
先介绍一下FutureTask的一些参数。
// 当前任务状态
private volatile int state;
// 当前任务处于新建状态,尚未执行
private static final int NEW = 0;
// 当前任务处于完成中状态,指已完成请求,但未将结果传给outcome
private static final int COMPLETING = 1;
// 当前任务正常结束
private static final int NORMAL = 2;
// 当前任务处于执行异常状态,call方法执行时抛出了异常
private static final int EXCEPTIONAL = 3;
// 当前任务已撤销
private static final int CANCELLED = 4;
// 当前任务被打断中
private static final int INTERRUPTING = 5;
// 当前任务已被打断
private static final int INTERRUPTED = 6;
// 构造时传入的callable对象
private Callable<V> callable;
/**
* 正常情况下:outcome是任务执行的返回结果
* 非正常情况下:outcome是任务执行的异常
*/
private Object outcome; // non-volatile, protected by state reads/writes
// 执行当前任务的线程
private volatile Thread runner;
// 等待当前任务执行结果的线程链
private volatile WaitNode waiters;再来看下FutureTask的构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
// 将任务的状态设置为新建状态
this.state = NEW;
}看看任务调用逻辑,从run方法开始,run是执行callable的call方法的入口。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 只执行处于NEW状态的任务
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行callable的call方法
result = c.call();
// 任务有正常的执行完成
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
// 执行完成,将result写入outcome,修改state
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 任务处于中断中/已中断
if (s >= INTERRUPTING)
// 告诉CPU,我不用了
handlePossibleCancellationInterrupt(s);
}
}set方法和setException方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将结果传给outcome
outcome = v;
// 将任务状态设置为正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常传给outcome
outcome = t;
// 将任务状态设置为异常结束
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// 当存在线程阻塞等待执行结果,就陷入死循环
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
// 方便GC
q.thread = null;
// 唤醒阻塞的线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// 方便GC
q.next = null;
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}再看看get方法,了解基本的流程。再考虑撤销这种情况。
public V get() throws InterruptedException, ExecutionException {
// 查看当前任务状态
int s = state;
// 如果任务还没有完成,咱就死等
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 我等了那么久,你还完不成?别执行了。
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}先调用了awaitDone方法,再调用了report方法。我们一个一个看。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果状态大于完成中,就返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 完成中
else if (s == COMPLETING) // cannot time out yet
// 我这边虽然没到时间,但是我用完了。告诉CPU,我不用了
Thread.yield();
// 任务没执行完呢。来来来,过来先填表,先创建个WaitNode对象。这是第一次自旋
else if (q == null)
q = new WaitNode();
// 填完表了?过去排队。这是第二次自旋
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 是否设置的等待时间
else if (timed) {
// 计算剩下的等待时间
nanos = deadline - System.nanoTime();
// 如果超时了就不挣扎了
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 挂起当前线程,设置超时时间
LockSupport.parkNanos(this, nanos);
}
else
// 挂起当前线程,等待结果
LockSupport.park(this);
}
}report方法
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常就返回结果,不正常就抛出异常
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}到这里,一个正常执行结束的流程就ok了。我们再来看看其他情况
// 线程被中断时,移除节点
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
// 回到retry标记,继续执行
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
// 正在运行中并且成功修改状态
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// 获取当前正在执行的线程,然后中断它
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
// 设置任务状态为被已中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒对应的阻塞线程
finishCompletion();
}
return true;
}版权声明:本文为qq_22156459原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。