guava——ListenableFuture

ListenableFuture简介

public interface ListenableFuture extends Future
ListenableFuture继承自Futures,同样是接口,主要的功能是允许给多线程任务添加监听器listener,当任务执行完之后,会自动触发listener的调用
不同的listener可以在不同的Executors中执行,同一个的ListenableFuture可以注册多个listener。

初始化ListenableFuture对象

通过MoreExecutors.listeningDecorator方法初始化一个ListeningExecutorService,然后使用此实例的submit方法即可初始化ListenableFuture对象

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
        new  ThreadPoolExecutor(2, 10,20, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy()));
ListenableFuture<String> listenableFuture = executorService.submit(() -> {
    System.out.println("call execute..");
    TimeUnit.SECONDS.sleep(6);
    return "I'am yellow duck";
});

使用ListenableFuture注册监听器

接口声明了addListener方法,给future添加相应的listener,当future执行完之后,listener就会被调用执行
addListener(Runnable listener, Executor executor) Registers a listener to be run on the given executor.
使用如下:

listenableFuture.addListener(() -> {
    try {
        System.out.println("get listenable future's result -- " + listenableFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}, executorService);

给ListenableFuture添加回调函数

通过Futures.addCallback给ListenableFuture添加回调函数
使用如下:

Futures.addCallback(listenableFuture, new FutureCallback<String>() {
       @Override
       public void onSuccess(String result) {
            System.out.println("get listenable future's result with callback -- " + result);
       }
       @Override
       public void onFailure(Throwable t) {
            t.printStackTrace();
       }
});

ListenableFuture其他功能

  • SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息

  • CheckedFuture:
    这是一个继承自ListenableFuture接口,他提供了checkedGet方法,此方法在Future执行发生异常时,可以抛出指定类型的异常

Function<Exception, ApplicationException> mapper = from -> {
    if (from != null && from.getCause() instanceof ApplicationException) {
        try {
            throw (ApplicationException) from.getCause();
        } catch (ApplicationException e) {
            e.printStackTrace();
        }
    }
    return new ApplicationException("1", null);
};
CheckedFuture<String, ApplicationException> checkedFuture = Futures.makeChecked(listenableFuture, mapper);
try {
    checkedFuture.checkedGet(1, TimeUnit.SECONDS);
} catch (TimeoutException | ApplicationException e) {
    e.printStackTrace();
}

transform

transform(ListenableFuture<I> future, Function<? super I,? extends O> function, Executor exec)
transform是Future提供的静态函数,可以对future的返回值进行自定义函数的转换,使用如下:
Function<String, String> transFunction = queryResult -> "transformed --" + queryResult;
ListenableFuture<String> transformFuture = transform(listenableFuture, transFunction, executorService);
try {
    System.out.println(transformFuture.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

完整测试代码

import org.omg.CORBA.portable.ApplicationException;
import org.weakref.jmx.internal.guava.base.Function;
import org.weakref.jmx.internal.guava.util.concurrent.CheckedFuture;
import org.weakref.jmx.internal.guava.util.concurrent.FutureCallback;
import org.weakref.jmx.internal.guava.util.concurrent.Futures;
import org.weakref.jmx.internal.guava.util.concurrent.ListenableFuture;
import org.weakref.jmx.internal.guava.util.concurrent.ListeningExecutorService;
import org.weakref.jmx.internal.guava.util.concurrent.MoreExecutors;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.weakref.jmx.internal.guava.util.concurrent.Futures.transform;

/**
 * @Author KeXin
 * @Date 2020/10/12 下午6:54
 **/
public class ListenerFutureTest {
    public static void main(String[] args) {
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
                new  ThreadPoolExecutor(2, 10,20, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(10),
                        new ThreadPoolExecutor.DiscardOldestPolicy()));
        ListenableFuture<String> listenableFuture = executorService.submit(() -> {
            System.out.println("call execute..");
            TimeUnit.SECONDS.sleep(6);
            return "I'am yellow duck";
        });

        listenableFuture.addListener(() -> {
            try {
                System.out.println("get listenable future's result -- " + listenableFuture.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }, executorService);

        Futures.addCallback(listenableFuture, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.out.println("get listenable future's result with callback -- " + result);
            }
            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        });

        Function<String, String> transFunction = queryResult -> "transformed --" + queryResult;
        ListenableFuture<String> transformFuture = transform(listenableFuture, transFunction, executorService);
        try {
            System.out.println(transformFuture.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

//        Function<Exception, ApplicationException> mapper = from -> {
//            if (from != null && from.getCause() instanceof ApplicationException) {
//                try {
//                    throw (ApplicationException) from.getCause();
//                } catch (ApplicationException e) {
//                    e.printStackTrace();
//                }
//            }
//            return new ApplicationException("1", null);
//        };
//        CheckedFuture<String, ApplicationException> checkedFuture = Futures.makeChecked(listenableFuture, mapper);
//        try {
//            checkedFuture.checkedGet(1, TimeUnit.SECONDS);
//        } catch (TimeoutException | ApplicationException e) {
//            e.printStackTrace();
//        }


    }

}

版权声明:本文为qq_29721419原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。