Java并发编程之FutureTask源码解析

上次总结一下AQS的一些相关知识,这次总结了一下FutureTask的东西,相对于AQS来说简单好多呀

之前提到过一个LockSupport的工具类,也了解一下这个工具类的用法,这里也巩固一下吧

    /**
     * Makes available the permit for the given thread, if it
     * was not already available.  If the thread was blocked on
     * {@code park} then it will unblock.  Otherwise, its next call
     * to {@code park} is guaranteed not to block. This operation
     * is not guaranteed to have any effect at all if the given
     * thread has not been started.
     *
     * @param thread the thread to unpark, or {@code null}, in which case
     *        this operation has no effect
     */
    //将指定线程唤醒,继续执行指定线程
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }    
    /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of three
     * things happens:
     *
     * <ul>
     *
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread upon return.
     */
    //阻塞当前线程,等待调用unpark()唤醒当前线程
    public static void park() {
        UNSAFE.park(false, 0L);
    }
    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;

就是阻塞线程以及唤醒指定线程,在FutureTask的源码中能用到

RunnableFuture<V>

FutureTask继承自这个接口,这个接口有继承了Runnable以及Future接口,所以FutureTask对象可以用new Thread().start()去启动,所以之前提到了创建线程的三种方式,采用Callable+FutureTask的形式创建,依旧还是依赖于Runnable创建线程

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

源码解析

既然继承了Runnable接口就必然执行run()方法,我们先看下主要成员变量

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    //记录当前线程执行的状态,是否正常、结束、异常、中断
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    //Callable对象
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    //结果集
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    //当前执行的线程
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    //等待线程节点
    private volatile WaitNode waiters;
    
    //单向链表
    static final class WaitNode {
        volatile Thread thread;//记录当前线程
        volatile WaitNode next;//下一个节点
        WaitNode() { thread = Thread.currentThread(); }
    }

看一下执行主体,这个方法主要是将Callable对象的那个业务逻辑执行完毕,只有执行完成之后采用将值返回,并且将当前线程通过LockSupport.unpark()进行唤醒。

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//调用Callable对象并执行call()方法中的变量
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);//发生异常则将结果设置成异常
                }
                if (ran)
                    set(result);//设置正常结果
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
//如果是中断结束的,则调用线程中断方法
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    //无论结果是否正常,都会执行,主要是为了唤醒线程,避免死锁
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //唤醒当前对象的线程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

看一下Future的结果值的方法,每步方法在代码中都有讲解

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //先判断当前线程的执行状态是否执行完毕,未执行完的则调用等待方法
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    //方法就是用过LockSupport.park()进入线程等待方法,等待调用unpark然后在次判断是否执行完,执行完后将改方法结束,进入下一阶段
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    //在等待完之后,再次判断是否正常完成执行,正常的话将值返回,否则抛出异常
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

通过上面的讲解,应该对FutureTask为什么能有返回值以及基本运行机制应该有个初步的了解,可以自行的去多看几遍。


版权声明:本文为qq_40874285原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。