ListenableFuture简介
public interface ListenableFuture extends Future
ListenableFuture继承自Futures,同样是接口,主要的功能是允许给多线程任务添加监听器listener,当任务执行完之后,会自动触发listener的调用
不同的listener可以在不同的Executors中执行,同一个的ListenableFuture可以注册多个listener。
初始化ListenableFuture对象
通过MoreExecutors.listeningDecorator方法初始化一个ListeningExecutorService,然后使用此实例的submit方法即可初始化ListenableFuture对象
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(2, 10,20, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy()));
ListenableFuture<String> listenableFuture = executorService.submit(() -> {
System.out.println("call execute..");
TimeUnit.SECONDS.sleep(6);
return "I'am yellow duck";
});
使用ListenableFuture注册监听器
接口声明了addListener方法,给future添加相应的listener,当future执行完之后,listener就会被调用执行
addListener(Runnable listener, Executor executor) Registers a listener to be run on the given executor.
使用如下:
listenableFuture.addListener(() -> {
try {
System.out.println("get listenable future's result -- " + listenableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, executorService);
给ListenableFuture添加回调函数
通过Futures.addCallback给ListenableFuture添加回调函数
使用如下:
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("get listenable future's result with callback -- " + result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
ListenableFuture其他功能
SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息
CheckedFuture:
这是一个继承自ListenableFuture接口,他提供了checkedGet方法,此方法在Future执行发生异常时,可以抛出指定类型的异常
Function<Exception, ApplicationException> mapper = from -> {
if (from != null && from.getCause() instanceof ApplicationException) {
try {
throw (ApplicationException) from.getCause();
} catch (ApplicationException e) {
e.printStackTrace();
}
}
return new ApplicationException("1", null);
};
CheckedFuture<String, ApplicationException> checkedFuture = Futures.makeChecked(listenableFuture, mapper);
try {
checkedFuture.checkedGet(1, TimeUnit.SECONDS);
} catch (TimeoutException | ApplicationException e) {
e.printStackTrace();
}
transform
transform(ListenableFuture<I> future, Function<? super I,? extends O> function, Executor exec)
transform是Future提供的静态函数,可以对future的返回值进行自定义函数的转换,使用如下:
Function<String, String> transFunction = queryResult -> "transformed --" + queryResult;
ListenableFuture<String> transformFuture = transform(listenableFuture, transFunction, executorService);
try {
System.out.println(transformFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
完整测试代码
import org.omg.CORBA.portable.ApplicationException;
import org.weakref.jmx.internal.guava.base.Function;
import org.weakref.jmx.internal.guava.util.concurrent.CheckedFuture;
import org.weakref.jmx.internal.guava.util.concurrent.FutureCallback;
import org.weakref.jmx.internal.guava.util.concurrent.Futures;
import org.weakref.jmx.internal.guava.util.concurrent.ListenableFuture;
import org.weakref.jmx.internal.guava.util.concurrent.ListeningExecutorService;
import org.weakref.jmx.internal.guava.util.concurrent.MoreExecutors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.weakref.jmx.internal.guava.util.concurrent.Futures.transform;
/**
* @Author KeXin
* @Date 2020/10/12 下午6:54
**/
public class ListenerFutureTest {
public static void main(String[] args) {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(2, 10,20, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy()));
ListenableFuture<String> listenableFuture = executorService.submit(() -> {
System.out.println("call execute..");
TimeUnit.SECONDS.sleep(6);
return "I'am yellow duck";
});
listenableFuture.addListener(() -> {
try {
System.out.println("get listenable future's result -- " + listenableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, executorService);
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("get listenable future's result with callback -- " + result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
Function<String, String> transFunction = queryResult -> "transformed --" + queryResult;
ListenableFuture<String> transformFuture = transform(listenableFuture, transFunction, executorService);
try {
System.out.println(transformFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Function<Exception, ApplicationException> mapper = from -> {
// if (from != null && from.getCause() instanceof ApplicationException) {
// try {
// throw (ApplicationException) from.getCause();
// } catch (ApplicationException e) {
// e.printStackTrace();
// }
// }
// return new ApplicationException("1", null);
// };
// CheckedFuture<String, ApplicationException> checkedFuture = Futures.makeChecked(listenableFuture, mapper);
// try {
// checkedFuture.checkedGet(1, TimeUnit.SECONDS);
// } catch (TimeoutException | ApplicationException e) {
// e.printStackTrace();
// }
}
}
版权声明:本文为qq_29721419原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。