FutureTask原理解析图
get()方法
public V get() throws InterruptedException, ExecutionException {
// private static final int NEW = 0; 新建状态,任务未开始
// private static final int COMPLETING = 1; 任务执行完成,未提交数据
// private static final int NORMAL = 2; 任务完成,并且提交可以获取返回数据
// private static final int EXCEPTIONAL = 3; 异常
// private static final int CANCELLED = 4; 取消
// private static final int INTERRUPTING = 5; 打断
// private static final int INTERRUPTED = 6; 中断
int s = state; // 此任务的运行状态
if (s <= COMPLETING) // 如果当前状态小于1,表示任务还未提交完成
s = awaitDone(false, 0L); // 丢尽阻塞队列等待
return report(s); // 任务完成返回对象
}
// 阻塞
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; // 等待超时时间
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q); //中断
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // 当前状态大于1,表示要么完成返回要么异常出问题直接返回null
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 当前任务完成在提交返回值中
Thread.yield();
else if (q == null) // 阻塞队列为空
q = new WaitNode();
else if (!queued) // cas添加进阻塞队列当中,因为存在多个对象获取返回值
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) { // 任务未完成超时,直接中断
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); // 阻塞拦截
}
else
LockSupport.park(this); // 设置拦截
}
}
// 阻塞
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); // 屏障阻塞
UNSAFE.park(false, 0L); // 阻塞
setBlocker(t, null);
}
// 返回结果
private V report(int s) throws ExecutionException {
Object x = outcome; // outcome表示全局变量object返回对象
if (s == NORMAL) //完成返回
return (V)x;
if (s >= CANCELLED) // 异常中断
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
run()方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread())) // 当前状态不为新建状态表示执行过
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) { // 获取任务
V result;
boolean ran;
try {
result = c.call(); //执行任务返回对象
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); //修改执行任务完成
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 修改任务执行结果状态
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // cas操作修改任务执行完毕
outcome = v; // 赋值
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // cas操作修改任务执行完毕已赋值
finishCompletion();
}
}
// 移除所有等待的线程并唤醒
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 开始移除等待队列对象
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 唤醒
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // 任务执行完毕设置为空
}
CompletableFuture常见使用方法
supplyAsync带返回值
runAsync不带返回值
- 纯消费类型的方法:没有返回值
// 单个任务执行完毕后在执行下面那个请求
public CompletionStage<Void> thenAccept(Consumer<? super T> action); //当前线程同步执行
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); // //使用ForkJoinPool.commonPool线程池执行action
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); //使用自定义线程池执行action
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
return "赵云:冲冲冲";
}).thenAccept(param -> {
System.out.println(param);
});
}
// 两个任务都执行完毕在执行函数任务
public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) ;
public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
return "赵云:冲冲冲";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
return "黄总:拿我意大利炮来";
});
f1.thenAcceptBoth(f2, (t1, t2) -> {
System.out.println(t1 + t2);
});
}
// 其中一个任务执行完毕在执行函数任务
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
return "赵云:执行完毕";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
return "黄总:执行完毕";
});
f1.acceptEither(f2, System.out::println);
}
- 有返回值类型的方法
// 单个任务执行完毕后在执行下面那个请求
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
return "赵云:冲冲冲";
}).thenApply(s -> {
return s + " 队友:GOGOGO";
});
System.out.println(f1.get());
}
// 两个任务都执行完毕在执行函数任务
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
return " 赵云:冲冲冲";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
return " 黄总:拿我意大利炮来";
});
CompletableFuture<String> stringCompletableFuture = f1.thenCombine(f2, (t1, t2) -> {
return t2 + t1;
});
System.out.println(stringCompletableFuture.get());
}
// 其中一个任务执行完毕在执行函数任务
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
return " 赵云:冲冲冲";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
return " 黄总:拿我意大利炮来";
});
CompletableFuture<String> stringCompletableFuture = f1.applyToEither(f2,t1 -> {
return t1;
});
System.out.println(stringCompletableFuture.get());
}
- 不消费也不返回的方法
// 单个任务执行完毕后在执行下面那个请求
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
return " 赵云:冲冲冲";
});
f1.thenRun(() -> {
System.out.println("黄总:拿我意大利炮来");
});
}
// 两个任务都执行完毕在执行函数任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = supplyAsync(() -> {
return " 赵云:冲冲冲";
});
CompletableFuture<Void> voidCompletableFuture = f1.runAfterBoth(supplyAsync(() -> {
return "黄总:拿我意大利炮来";
}), () -> {
System.out.println("花木兰:待我归来");
});
System.out.println(voidCompletableFuture.get()); // 因为没用返回值 为 null
}
// 其中一个任务执行完毕在执行函数任务
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = supplyAsync(() -> {
return " 赵云:冲冲冲";
});
CompletableFuture<Void> voidCompletableFuture = f1.runAfterEither(supplyAsync(() -> {
return "黄总:拿我意大利炮来";
}), () -> {
System.out.println("花木兰:待我归来");
});
System.out.println(voidCompletableFuture.get()); // 因为没用返回值 为 null
}
- 多任务组合
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<List<Dog>> f1 = supplyAsync(() -> {
List<Dog> dogs = new ArrayList<>();
dogs.add(new Dog().setName("大黄"));
dogs.add(new Dog().setName("小黑"));
dogs.add(new Dog().setName("麻花"));
return dogs;
});
CompletableFuture gods = f1.thenApply(
dogs -> {
dogs.stream().map(dog1 ->
supplyAsync(() -> {
return dog1.setAge(new Random().nextInt(10));
}).thenCompose(dog2 ->
supplyAsync(() -> {
return dog2.setColor("黄色");
})
)).toArray(size -> new CompletableFuture[size]);
return dogs;
}
);
System.out.println(gods.get());
}
public class Dog {
private String name;
private int age;
private String color;
public int getAge() {
return age;
}
public Dog setAge(int age) {
this.age = age;
return this;
}
public String getColor() {
return color;
}
public Dog setColor(String color) {
this.color = color;
return this;
}
public String getName() {
return name;
}
public Dog setName(String name) {
this.name = name;
return this;
}
@Override
public String toString() {
return "Dog{" +
"name='" + name + '\'' +
", age=" + age +
", color='" + color + '\'' +
'}';
}
}
版权声明:本文为weixin_46919552原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。