一、Future模式
1.Future简介
Future本身是一种被广泛运用的开发设计模式,在很大程度上简化需要数据流同步的并发应用开发。Future对象本身可以看做是一个显示的引用,他的核心思想是异步调用,类似于ajax的异步请求,无需等待请求的结果,可以继续去处理其他的业务。
例如就像现在在网上买东西,选中东西付款之后,不会立即拿到东西,而是拿到一个买东西的订单号,然后过一段时间之后根据订单号去拿真正的货物,而在等待货物的期间我们是可以去干别的任何事情。其中Future接口就是订货单,真正处理订单的是Executor类,他根据Future接口的要求来生产产品。
2.Future接口
下面是Future接口中的定义的方法,包括5个接口方法,这5个接口方法实际上提供了Future的3个功能:判断任务是否完成,中断任务,获取任务的执行结果。
public interface Future<V> {
/**
* 取消任务的执行
* 如果任务已经完成,或者已经被取消,或者因为其他原因不能被取消,则返回失败
* 如果任务在调用时还未启动,那么返回成功
* 如果任务已经在执行过程中,则根据参数确定此执行任务的线程能否被中断,来试图停止任务的执行
* @param mayInterruptIfRunning
* @return
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否已经取消,任务正常完成前将其取消,则返回true
* @return
*/
boolean isCancelled();
/**
* 判断任务是否已经完成,需要注意的是如果任务正常、异常或取消,都返回true
* @return
*/
boolean isDone();
/**
* 等待任务执行结束,并返回结果
* @return
* @throws InterruptedException 线程被中断异常
* @throws ExecutionException 任务执行异常
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待任务执行结束,并返回结果,同上面get方法的区别是设置了超时时间,
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}3.Future模式执行的原理
| 角色 | 作用 |
|---|---|
| main | 启动系统的主程序,调用client发送请求 |
| client | 返回Data对象,并且立即返回FutureData结果 ,同时开启ClientThread线程来装配RealData |
| Data | 返回数据的接口 |
| FutureData | client请求之后立即返回的一个“虚假”的结果 |
| RealData | 真实的结果数据 |
二、FutureTask实现
1.FutureTask介绍
Future只是一个接口,无法直接创建对象,因此有了FutureTask。RunnableFuture继承了Runnable和Future接口,而FutureTask实现了RunnableFuture接口。
public class FutureTask<V> implements RunnableFuture<V> {
……
}public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}2.FutureTask中的变量
/**
* state状态变化的方式
* 1.正常完成的流程:NEW -> COMPLETING -> NORMAL
* 2.出现异常的流程:NEW -> COMPLETING -> EXCEPTIONAL
* 3.被取消:NEW -> CANCELLED
* 4.被中断:NEW -> INTERRUPTING -> INTERRUPTED
*/
//记录task的状态
private volatile int state;
//新建,也表示内部成员callable已成功赋值,一直到工作线程完成FutureTask中的run()
private static final int NEW = 0;
//执行中,工作线程在处理task时的中间状态,处于该状态是,说明工作线程正准备设置result
private static final int COMPLETING = 1;
//正常,设置result结果完成,FutureTask处于该状态,代表过程结果,该状态为正确完成的最终状态
private static final int NORMAL = 2;
//异常:task在指向过程中出现异常,也是最终态
private static final int EXCEPTIONAL = 3;
//取消:task被取消,最终态
private static final int CANCELLED = 4;
//中断中:task运行过程中被中断时,设置的中间状态
private static final int INTERRUPTING = 5;
//被中断:中断完成的最终状态
private static final int INTERRUPTED = 6;
//具体run()运行时会调用其方法call(),并获取结果,结束执行置为null
private Callable<V> callable;
//伴随state进行读写
private Object outcome;
//具体的工作线程
private volatile Thread runner;
//并发stack数据结构,用于存放阻塞在该FutureTask.get方法的线程 Treiber算法参考Lock-Free算法
private volatile WaitNode waiters;2.FutureTask的run()方法
下面是FutureTask的run()方法的执行过程,主要分为几个步骤:
(1)判断当前任务的状态,如果任务的状态不是new,说明状态已经发生了变化,执行的路径是上面4种中的一种,直接返回。
(2)如果工作线程是null,则把当前执行任务的线程赋值给runner,如果runner不为null,说明已经有线程在执行,直接返回。此处使用compareAndSwapObject来对工作线程进行赋值,是因为该方法能够保证原子性,保证多个线程同时提交一个FutureTask时,确保该FutureTask的run只被调用一次,如果想运行多次,使用runAndReset()方法。
(3)开始执行任务,如果执行的任务不为空,并且任务的状态是new,调用Callable的call方法。
(4)如果任务执行成功则set结果,如果出现异常则setException
(5)把runner设置为null
public void run() {
//首先判断任务的状态,如果任务不是new状态,说明任务的状态已经开始改变,4中可能中的一种
if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
//开始执行任务
try {
Callable<V> c = callable;
//如果要执行的任务不为空,并且状态 new 就开始执行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务
result = c.call();
//没有发生异常,任务执行成功
ran = true;
} catch (Throwable ex) {
//有异常的情况下
result = null;
ran = false;
//设置异常
setException(ex);
}
//如果执行成功了,设置结束
if (ran)
set(result);
}
} finally {
//结束任务后需要将工作线程置为null
runner = null;
//如果是异常状态,需要返回异常信息
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}接下来我们看一下run()中调用的set()方法,该方法主要过程就是将状态new->completing->normal,最后调用finishCompletion()方法。
protected void set(V v) {
//如果现在的状态是new 就设置成 completing,然后设置为normal
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
} finishCompletion()方法的主要作用是解除所有阻塞的工作线程,调用done方法,将callable设为null private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}run方法只能被调用一次,如果想要执行多次,那么可以使用runAndReset方法,它与run方法的区别是最后一句的返回值,需要判断当前task的状态是否是new,其他的实现方法是一样的
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}3.FutureTask的get()方法
FutureTask中的get方法主要用于等待任务执行结束,并返回执行的结果。其中get方法有两种,一种是无限等待,直到任务执行结束,一种是有限等待,超过规定的时间仍未结束则不再等待,返回超时。其主要过程有:
(1)判断任务的状态,如果还未执行或者是在执行过程中,调用awaitDone方法,等待任务的执行;
(2)如果任务执行结束,调用report(state)方法,返回执行结果,如果是normal状态,即执行了run()中的set方法则返回执行结果,如果是cancelled状态,即执行了run中的setException方法,抛出异常。
//等待任务执行结束--如果任务的状态小于等于正在执行中,那么调用awaitDone方法,等待任务执行结束
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
//等待任务执行并返回结果-有等待时间
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
//返回结果:如果是正常结束,则返回执行结果,如果是被取消或者出现中断,则抛出异常结果
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
在get()方法中,当任务未完成时,需要调用awaitDone方法对线程进行阻塞等待,下面看一下阻塞等待的实现过程:
(1)计算等待时间;
(2)判断线程是否中断,如果线程中断,则将当前线程从等待队列waiters中移除,抛出中断异常;
(3)如果当前task的状态为已完成状态(正常完成或者取消等最终态),将等待线程结点的线程值为null,返回状态state;
(4)如果task的状态为正在执行中,说明正在set结果,那么调用Thread.yield方法将线程的状态从执行状态变为可执行状态,稍微等待一会;
(5)如果等待线程结点为null,初始化等待线程结点;
(6)如果当前等待线程结点还未成功进入等待队列waiters,进入线程等待队列;
(7)如果是带有等待超时时间,判断当前是否超时,如果已经超时,则将当前等待结点从waiters中移除,返回task的状态,如果还未超时,则阻塞当前线程;
(8)如果是其他的情况,阻塞当前线程。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//如果执行get线程被中断,则移除FutureTask的所有阻塞队列中的线程,抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果state编程完成状态(正常完成或者取消),则返回完成状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果state变成completing,说明正在set结果,让线程等一会儿
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//如果等待线程结点为null,初始化等待线程结点
else if (q == null)
q = new WaitNode();
//如果当前等待线程结点还未成功进入等待队列waiters,进入线程等待队列
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
//如果带了等待超时时间,如果已经超时,将当前节点从等待队列中移除,返回task的状态,如果未超时,阻塞当前线程
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//阻塞当前线程
else
LockSupport.park(this);
}
}其实从get的整个过程来看:
(1)FutureTask中维护了一个等待队列waiters,如果task还没有执行完毕,调用get方法的线程会先进入等待队列进行等待;
(2)awaitDone方法的执行过程其实是一个死循环,直到task的状态达到一个最终态或者等待时间超时或者线程中断才会跳出循环;
(3)为了节省开销,线程不会一直自旋等待,而是会进行阻塞。
4.FutureTask的cancel()方法
下面是FutureTask中取消任务执行的过程:首先要判断当前任务的状态,如果不是未启动状态,则返回false程序结束;如果是cancel(false),那么task的状态变化就是new->cancelled,如果是cancel(true)那么task的状态变化就是new->interrupting->interrupted
public boolean cancel(boolean mayInterruptIfRunning) {
//判断线程的状态
if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
//如果可以中断,就中断线程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}三、CompletableFuture的实现
1.Future模式的缺点
(1)实现了异步获取执行结果的需求,但是没有提供通知,无法知道任务什么时候完成;
(2)get方法获取结果的时候,会进入等待阻塞状态,这时候又变成同步操作,如果使用isDone循环判断任务是否完成,会耗费cpu资源。
2.CompletableFuture介绍
completableFuture是在java8中新增的类,它能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行,同时避免了传统回调最大的问题,那就是能够将控制流分离到不同的时间处理器中。
complatableFuture弥补了Future模式的缺点,在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接将异步处理的结果交给另外一个异步事件处理线程来处理。
3.CompletableFuture的特性
(1)创建CompletableFuture对象:ComplatableFuture.CompletableFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture。以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:
runAsync方法:以runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。
suppleAsync方法:是以Supplier<U>函数式接口类型为参数,所以计算结果类型为U。
方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务。
//使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//使用指定的thread pool执行异步代码,异步操作有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
//使用ForkJoinPool.commonPool()作为他的线程池执行异步代码
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
//使用指定的thread pool执行异步代码
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}(2)CompletableFuture.complete方法:在执行future.get()方法时,如果执行的结果还没有出来,会一直处于阻塞的状态,这时候可以调用complete方法会立即执行,但是complete方法只会执行一次,对于重复调用只会获取到第一次的结果,同时如果future已经能够返回结果,那么调用complete方法也会无效
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
//如果没有执行结束,则返回一个异常的结果,而不是一个成功的结果
public boolean completeExceptionally(Throwable ex) {
if (ex == null) throw new NullPointerException();
boolean triggered = internalComplete(new AltResult(ex));
postComplete();
return triggered;
}例如在下面的例子中,如果不使用Thread.sleep(2000),直接调用future.complete("world"),返回的结果可能是“hello world”,如果调用两次complete,第二次的结果不会生效,如果加上了Thread.sleep,可以看到在调用complete时future已经执行结束,这时候也不会生效。
public static void supplyAsyncTest() {
System.out.println("CompletableFuture supplyAsync=================");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
/*try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}*/
// future.complete("world");
// future.complete("hello!!!!!!!");*/
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}(3)CompletableFuture.thenApply:接受一个参数用来转换CompletableFuture,可以使用多个thenApply,但是这多个之间是串行执行的,下一个的执行依赖于下一个的结果,他的功能相当于是将Comple<T>转换成CompletableFuture<U>
| 方法名 | 描述 |
|---|---|
| <U> CompletableFuture<U> thenApply(Function fn) | 接受一个Function参数用来转换成CompletableFuture |
| <U> CompletableFuture<U> thenApplyAsync(Function fn) | 接受一个Function参数用来转换成CompletableFuture,使用的是ForkJoinPool |
| <U> CompletableFuture<U> thenApplyAsync( Function fn, Executor executor) | 接受一个Function参数用来转换成CompletableFuture,使用指定的线程池 |
(4)CompletableFuture.thenCompose:可以用于组合多个CompletableFuture,将前一个的结果作为下一个计算的参数,因此存在的执行的先后顺序,将多个彼此依赖的future进行串联起来,返回最新的一个。
| 方法名 | 描述 |
|---|---|
| <U> CompletableFuture<U> thenCompose(Function fn) | 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型 |
| <U> CompletableFuture<U> thenComposeAsync(Function fn) | 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型,使用ForkJoinPool线程池 |
| <U> CompletableFuture<U> thenComposeAsync(Function fn,Executor executor) | 在一部操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型,使用指定的线程池 |
(5)CompletableFuture.thenCombine:将多个独立的CompletableFuture组合起来,多个future之间是并行执行的,最后再将结果进行汇总,这是与thenCompose不同的地方。
| 方法名 | 描述 |
|---|---|
| <U,V> CompletableFuture<V> thenCombine(CompletionStage other,BiFunction fn) | 当两个CompletableFuture都正常完成之后,执行提供的fn,组合成一个新的CompletableFuture |
| <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage other, BiFunction fn) | 当两个CompletableFuture都正常完成之后,执行提供的fn,组合成一个新的CompletableFuture,使用ForkJoinPool |
| <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage other,BiFunction fn, Executor executor) | 当两个CompletableFuture都正常完成之后,执行提供的fn,组合成一个新的CompletableFuture,使用指定的线程池 |
四、Future、FutureTask、CompletableFuture的比较
| 描述 | Future | FutureTask | CompletableFuture |
|---|---|---|---|
| 原理 | Future接口 | Future、Runnable | Future、CompletionStage |
| 并发执行 | 支持 | 支持 | 支持 |
| 结果的顺序 | 提交顺序 | 未知 | 可指定顺序 |
| 异常捕捉 | 自己捕捉 | 自己捕捉 | API中返回每个异常的生成 |