FutureTask原理和CompletableFuture使用

FutureTask原理解析图
在这里插入图片描述

get()方法

public V get() throws InterruptedException, ExecutionException {
// private static final int NEW          = 0;    新建状态,任务未开始
// private static final int COMPLETING   = 1;    任务执行完成,未提交数据
// private static final int NORMAL       = 2;    任务完成,并且提交可以获取返回数据
// private static final int EXCEPTIONAL  = 3;    异常
// private static final int CANCELLED    = 4;    取消
// private static final int INTERRUPTING = 5;    打断
// private static final int INTERRUPTED  = 6;    中断
    int s = state;    // 此任务的运行状态
    if (s <= COMPLETING)    // 如果当前状态小于1,表示任务还未提交完成
        s = awaitDone(false, 0L);    // 丢尽阻塞队列等待
    return report(s);    // 任务完成返回对象
}
// 阻塞
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;    // 等待超时时间
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);    //中断
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {    // 当前状态大于1,表示要么完成返回要么异常出问题直接返回null
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 当前任务完成在提交返回值中
            Thread.yield();
        else if (q == null)    // 阻塞队列为空
            q = new WaitNode();
        else if (!queued)    // cas添加进阻塞队列当中,因为存在多个对象获取返回值
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {    // 任务未完成超时,直接中断
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);    // 阻塞拦截
        }
        else
            LockSupport.park(this);    // 设置拦截
    }
}
// 阻塞
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);    // 屏障阻塞
    UNSAFE.park(false, 0L);    // 阻塞
    setBlocker(t, null);
}
// 返回结果
private V report(int s) throws ExecutionException {
    Object x = outcome;    // outcome表示全局变量object返回对象
    if (s == NORMAL)    //完成返回
        return (V)x;
    if (s >= CANCELLED)    // 异常中断
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

run()方法

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread())) // 当前状态不为新建状态表示执行过
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {    // 获取任务
            V result;
            boolean ran;
            try {
                result = c.call();    //执行任务返回对象
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)    
                set(result);    //修改执行任务完成
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
// 修改任务执行结果状态
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {    // cas操作修改任务执行完毕
        outcome = v;    // 赋值
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // cas操作修改任务执行完毕已赋值
        finishCompletion();
    }
}
// 移除所有等待的线程并唤醒
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {    // 开始移除等待队列对象
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);    // 唤醒
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // 任务执行完毕设置为空
}

CompletableFuture常见使用方法
supplyAsync带返回值
runAsync不带返回值

  1. 纯消费类型的方法:没有返回值
// 单个任务执行完毕后在执行下面那个请求
public CompletionStage<Void> thenAccept(Consumer<? super T> action);     //当前线程同步执行
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);     // //使用ForkJoinPool.commonPool线程池执行action 
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);     //使用自定义线程池执行action

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
        return "赵云:冲冲冲";
    }).thenAccept(param -> {
        System.out.println(param);
    });
}

// 两个任务都执行完毕在执行函数任务
public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) ;
public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);

public static void main(String[] args) {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        return "赵云:冲冲冲";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
        return "黄总:拿我意大利炮来";
    });
    f1.thenAcceptBoth(f2, (t1, t2) -> {
        System.out.println(t1 + t2);
    });
}

// 其中一个任务执行完毕在执行函数任务
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)

public static void main(String[] args) {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        return "赵云:执行完毕";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
        return "黄总:执行完毕";
    });
    f1.acceptEither(f2, System.out::println);
}

  1. 有返回值类型的方法
// 单个任务执行完毕后在执行下面那个请求
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); 
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); 
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); 

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        return "赵云:冲冲冲";
    }).thenApply(s -> {
        return s + " 队友:GOGOGO";
    });
    System.out.println(f1.get());
}

// 两个任务都执行完毕在执行函数任务
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        return " 赵云:冲冲冲";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
        return " 黄总:拿我意大利炮来";
    });
    CompletableFuture<String> stringCompletableFuture = f1.thenCombine(f2, (t1, t2) -> {
        return t2 + t1;
    });
    System.out.println(stringCompletableFuture.get());
}

// 其中一个任务执行完毕在执行函数任务
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        return " 赵云:冲冲冲";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
        return " 黄总:拿我意大利炮来";
    });
    CompletableFuture<String> stringCompletableFuture = f1.applyToEither(f2,t1 -> {
        return  t1;
    });
    System.out.println(stringCompletableFuture.get());
}
  1. 不消费也不返回的方法
// 单个任务执行完毕后在执行下面那个请求
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        return " 赵云:冲冲冲";
    });
    f1.thenRun(() -> {
        System.out.println("黄总:拿我意大利炮来");
    });
}

// 两个任务都执行完毕在执行函数任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> f1 = supplyAsync(() -> {
        return " 赵云:冲冲冲";
    });
    CompletableFuture<Void> voidCompletableFuture = f1.runAfterBoth(supplyAsync(() -> {
        return "黄总:拿我意大利炮来";
    }), () -> {
        System.out.println("花木兰:待我归来");
    });
    System.out.println(voidCompletableFuture.get());    // 因为没用返回值 为 null
}

// 其中一个任务执行完毕在执行函数任务
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> f1 = supplyAsync(() -> {
        return " 赵云:冲冲冲";
    });
    CompletableFuture<Void> voidCompletableFuture = f1.runAfterEither(supplyAsync(() -> {
        return "黄总:拿我意大利炮来";
    }), () -> {
        System.out.println("花木兰:待我归来");
    });
    System.out.println(voidCompletableFuture.get());    // 因为没用返回值 为 null
}
  1. 多任务组合
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<List<Dog>> f1 = supplyAsync(() -> {
        List<Dog> dogs = new ArrayList<>();
        dogs.add(new Dog().setName("大黄"));
        dogs.add(new Dog().setName("小黑"));
        dogs.add(new Dog().setName("麻花"));
        return dogs;
    });
    CompletableFuture gods = f1.thenApply(
            dogs -> {
                dogs.stream().map(dog1 ->
                        supplyAsync(() -> {
                            return dog1.setAge(new Random().nextInt(10));
                        }).thenCompose(dog2 ->
                                supplyAsync(() -> {
                                    return dog2.setColor("黄色");
                                })

                        )).toArray(size -> new CompletableFuture[size]);
                return dogs;
            }
    );
    System.out.println(gods.get());
}

public class Dog {

    private String name;
    private int age;
    private String color;

    public int getAge() {
        return age;
    }

    public Dog setAge(int age) {
        this.age = age;
        return this;
    }

    public String getColor() {
        return color;
    }

    public Dog setColor(String color) {
        this.color = color;
        return this;
    }

    public String getName() {
        return name;
    }

    public Dog setName(String name) {
        this.name = name;
        return this;
    }

    @Override
    public String toString() {
        return "Dog{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", color='" + color + '\'' +
                '}';
    }
}

版权声明:本文为weixin_46919552原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。