Java基础之Future类

        Future是java1.5中引入的接口,由大神Doug Lea操刀,作用是我这边忙着工作,开个线程帮我干活,我等着你干完。生活中的例子就是订外卖。

Future的使用

public class FutureTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable rentStore = new Callable() {
            public Object call() throws Exception {
                Thread.sleep(1000);
                return "老公:我找到了合适的店面";
            }
        };

        Callable buyTableware = new Callable() {
            public Object call() throws Exception {
                Thread.sleep(1000);
                return "老婆:我找到了爽快的供应商";
            }
        };
        FutureTask<String> rentStoreTask = new FutureTask<String>(rentStore);
        FutureTask<String> buyTablewareTask = new FutureTask<String>(buyTableware);
        System.out.println("家庭会议中...");
        System.out.println("老婆:我觉得现在的工作索然无味,我们开个花店吧");
        System.out.println("老公:嗯,你这个主意不错,咱们分头行动,我去找店面,你去联系供应商");
        System.out.println("老婆:没问题,瞧好吧你");
        rentStoreTask.run();
        buyTablewareTask.run();
        System.out.println("积极筹备中...");
        System.out.println(rentStoreTask.get());
        System.out.println(buyTablewareTask.get());
    }
}

        这个例子中涉及了几个类:FutureTask、Callable,接下来我们来一一探讨。

Callable是什么?

        Callable是启动线程的一种方式。首先Callable是一个接口,只有一个call方法,方法没有参数,但有个返回值。

        Callable跟Runnable功能类似,但功能比Runnable功能更强大。因为Runnable无法返回值,而且Callable可以抛出异常Runnable不行。

        Callable需要FutureTask配合运行。

从FutureTask源码分析Future是怎么运行的

        先介绍一下FutureTask的一些参数。

// 当前任务状态
private volatile int state;
// 当前任务处于新建状态,尚未执行
private static final int NEW          = 0;
// 当前任务处于完成中状态,指已完成请求,但未将结果传给outcome
private static final int COMPLETING   = 1;
// 当前任务正常结束
private static final int NORMAL       = 2;
// 当前任务处于执行异常状态,call方法执行时抛出了异常
private static final int EXCEPTIONAL  = 3;
// 当前任务已撤销
private static final int CANCELLED    = 4;
// 当前任务被打断中
private static final int INTERRUPTING = 5;
// 当前任务已被打断
private static final int INTERRUPTED  = 6;

// 构造时传入的callable对象
private Callable<V> callable;
/**
* 正常情况下:outcome是任务执行的返回结果
* 非正常情况下:outcome是任务执行的异常
*/
private Object outcome; // non-volatile, protected by state reads/writes
// 执行当前任务的线程
private volatile Thread runner;
// 等待当前任务执行结果的线程链
private volatile WaitNode waiters;

        再来看下FutureTask的构造方法

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    // 将任务的状态设置为新建状态
    this.state = NEW;
}

        看看任务调用逻辑,从run方法开始,run是执行callable的call方法的入口。

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // 只执行处于NEW状态的任务
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行callable的call方法
                result = c.call();
                // 任务有正常的执行完成
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // 执行完成,将result写入outcome,修改state
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        // 任务处于中断中/已中断
        if (s >= INTERRUPTING)
            // 告诉CPU,我不用了
            handlePossibleCancellationInterrupt(s);
    }
}

        set方法和setException方法

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将结果传给outcome
        outcome = v;
        // 将任务状态设置为正常结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将异常传给outcome
        outcome = t;
        // 将任务状态设置为异常结束
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

private void finishCompletion() {
    // 当存在线程阻塞等待执行结果,就陷入死循环
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    // 方便GC
                    q.thread = null;
                    // 唤醒阻塞的线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                // 方便GC
                q.next = null;
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

        再看看get方法,了解基本的流程。再考虑撤销这种情况。

public V get() throws InterruptedException, ExecutionException {
    // 查看当前任务状态
    int s = state;
    // 如果任务还没有完成,咱就死等
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    // 我等了那么久,你还完不成?别执行了。
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

        先调用了awaitDone方法,再调用了report方法。我们一个一个看。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 计算超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 如果状态大于完成中,就返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 完成中
        else if (s == COMPLETING) // cannot time out yet
            // 我这边虽然没到时间,但是我用完了。告诉CPU,我不用了
            Thread.yield();
        // 任务没执行完呢。来来来,过来先填表,先创建个WaitNode对象。这是第一次自旋
        else if (q == null)
            q = new WaitNode();
        // 填完表了?过去排队。这是第二次自旋
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 是否设置的等待时间
        else if (timed) {
            // 计算剩下的等待时间
            nanos = deadline - System.nanoTime();
            // 如果超时了就不挣扎了
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // 挂起当前线程,设置超时时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 挂起当前线程,等待结果
            LockSupport.park(this);
    }
}

        report方法

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 正常就返回结果,不正常就抛出异常
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

        到这里,一个正常执行结束的流程就ok了。我们再来看看其他情况

// 线程被中断时,移除节点
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        // 回到retry标记,继续执行
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

public boolean cancel(boolean mayInterruptIfRunning) {
    // 正在运行中并且成功修改状态
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                // 获取当前正在执行的线程,然后中断它
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 设置任务状态为被已中断
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒对应的阻塞线程
        finishCompletion();
    }
    return true;
}

版权声明:本文为qq_22156459原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。