在这篇文章中,我们将看看如何ExeutorService使用 来运行多线程异步任务。我们将从直接创建线程开始,然后继续探索它ExeutorService以及如何使用它来简化事情。
直接创建线程
在 Executor API 出现之前,开发人员负责直接实例化和管理线程。下面我们来看一个简单的例子。
/**
* Call 2 expensive methods on separate threads
*
* @throws InterruptedException
*/
public void doMultiThreadedWork() throws InterruptedException {
/* create Runnable using anonymous inner class */
Thread t1 = new Thread(new Runnable() {
public void run() {
System.out.println("starting expensive task thread t1");
doSomethingExpensive();
System.out.println("finished expensive task thread t1");
}
});
/* start processing on new threads */
t1.start();
/* block current thread until t1 has finished */
t1.join();
}在上面的方法中,我们创建了一个新的Threadt1 并将 a 传递Runnable给它的构造函数。匿名内部类实现Runnable其中run()方法包含线程启动时将执行的逻辑。注意,如果里面的代码run()抛出了一个checked Exception,它必须在方法内部被捕获和处理。
介绍 Executor 服务
直接处理线程可能很麻烦,因此 Oracle 通过其 Executor API 提供一个抽象层来简化事情。AnExecutor允许您异步处理任务,而无需直接处理线程。
创建一个执行器
该Executors工厂类用于创建的实例Executor,无论是一个ExecutorService或一个 ScheduledExecutorService。Executor下面描述了一些最常见的类型。
Executors.newCachedThreadPool()—ExecutorService具有线程池的线程池,可根据需要创建新线程,但在可用时重用先前创建的线程。Executors.newFixedThreadPool(int numThreads)—ExecutorService具有固定线程数的线程池。该numThreads参数是ExecutorService在任一时间可以处于活动状态的最大线程数。如果提交到池中的请求数量超过池大小,则请求将排队等待线程变为可用。Executors.newScheduledThreadPool(int numThreads)—ScheduledExecutorService具有用于定期或在指定延迟后运行任务的线程池的A。Executors.newSingleThreadExecutor()—ExecutorService单线程。提交的任务将按提交的顺序一次执行一项。Executors.newSingleThreadScheduledExecutor()—ExecutorService使用单个线程定期或在指定延迟后执行任务的一种。
下面的代码片段创建了ExecutorService一个池大小为 2的固定线程池。我将ExecutorService在接下来的部分中使用它。
ExecutorService executorService = Executors.newFixedThreadPool(2);
在以下部分中,我们将了解如何ExecutorService使用它来创建和管理异步任务。
执行(可运行)
当您想要运行任务并且不关心检查其状态或获取结果时,该execute方法采用 aRunnable和非常有用。把它想象成火而忘记异步任务。
executorService.execute(()->{
System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
doSomethingExpensive();
}与使用匿名内部类的第一个 Thread 示例不同,上面的示例Runnable使用 lambda 表达式创建。该Runnable将尽快执行一个线程可以从ExecutorService线程池。
Future<?> 提交(可运行)
就像execute(),该submit()方法也接受 aRunnable但不同于execute()因为它返回 a Future。AFuture是一个对象,表示来自异步任务的挂起响应。将其视为可用于检查任务状态或在任务完成时检索其结果的句柄。Futures 使用泛型来允许您指定任务的返回类型。然而,鉴于该Runnablerun()方法的返回类型为 void,它Future保存了任务的状态而不是挂起的结果。这Future<?>在下面的示例中表示。
Future<?> taskStatus = executorService.submit(()->{
System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
doSomethingExpensive();
}在submit(Runnable)当你想运行一个不返回值的任务,但你想,它已经提交后检查任务的状态的方法是有用的ExecutorService。
检查任务状态
Future有一些有用的方法来检查已提交到ExecutorService.
isCancelled()检查提交的任务是否已经被取消。isDone()检查提交的任务是否已经完成。当任务完成时,isDone无论任务成功完成、失败还是被取消, 都将返回 true。cancel()取消提交的任务。一个布尔参数指定如果任务已经开始,是否应该中断它。
/* check if both tasks have completed - if not sleep current thread
* for 1 second and check again
*/
while(!task1Future.isDone() || !task2Future.isDone()){
System.out.println("Task 1 and Task 2 are not yet complete....sleeping");
Thread.sleep(1000);
}Future<T> 提交(可调用)
该submit方法被重载以采用 aCallable和 a Runnable。与 a 一样Runnable, aCallable表示在另一个线程上执行的任务。ACallable与 a 不同,Runable因为它返回一个值并且可以抛出一个已检查的异常。该Callable接口具有单个抽象方法,public T call() throws Exception并且Runable可以使用匿名内部类或 lambda 来实现。所述的返回类型call()方法用于键入Future被返回ExecutorService。下面的两个代码片段展示了如何Callable通过匿名内部类和 lambda 表达式创建 a。
Future<Double> task1Future = executorService.submit(new Callable<Double>() {
public Double call() throws Exception {
System.out.println(String.format("starting expensive task thread %s",
Thread.currentThread().getName()));
Double returnedValue = someExpensiveRemoteCall();
return returnedValue;
}
});Future<Double> task2Future = executorService.submit(()->{
System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
Double returnedValue = someExpensiveRemoteCall();
return returnedValue;
});这两个示例都创建了一个Callable并将其传递给execute方法。的Callable是只要一个线程可用来执行。
从未来得到结果
当 aCallable提交到 时ExecutorService,我们会收到Future带有call()方法返回类型的a 。在上面的例子中,call()返回 aDouble所以我们得到 a Future<Double>。从 a 检索结果的一种方法Future是调用它的get()方法。get()将无限期阻塞等待提交的任务完成。如果任务没有完成或需要很长时间才能完成,主应用程序线程将保持阻塞状态。
无限期地等待结果通常并不理想。如果任务没有在一定时间内完成,我们宁愿对如何检索结果有更多的控制权,并采取一些行动。幸运的是,有一个重载get(long timeout, TimeUnit unit)方法会等待指定的时间段,如果任务尚未完成(结果不可用),则会抛出一个TimeoutException.
Double value1 = task1Future.get();
Double value2 = task2Future.get(4, TimeUnit.SECONDS); // throws TimeoutException提交多个 Callables
除了允许你提交一个的Callable,则ExecutorService允许你提交Collection的Callable使用invokeAll方法。正如您所料,返回的是Futurea Collectionof Futures而不是返回单个。Future返回A表示每个提交任务的挂起结果。
Collection<Callable<Double>> callables = new ArrayList<>();
IntStream.rangeClosed(1, 8).forEach(i-> {
callables.add(createCallable());
});
/* invoke all supplied Callables */
List<Future<Double>> taskFutureList = executorService.invokeAll(callables);
/* call get on Futures to retrieve result when it becomes available.
* If specified period elapses before result is returned a TimeoutException
* is thrown
*/
for (Future<Double> future : taskFutureList) {
/* get Double result from Future when it becomes available */
Double value = future.get(4, TimeUnit.SECONDS);
System.out.println(String.format("TaskFuture returned value %s", value));
}上面的代码片段将 8 提交Callable给ExecutorService并检索List包含 8 Future。Future返回的列表与Callable提交的s 的顺序相同。请注意,Callable如果我们希望大多数或所有提交的任务可以并行执行,提交多个s 将需要调整线程池的大小。在上面的例子中,我们需要一个有 8 个线程的线程池来并行运行所有任务。
关闭 ExecutorService
在所有任务完成后,重要的是正常关闭,ExecutorService以便可以回收使用的资源。有两种方法可用,shutDown()和shutDownNow()。shutDown()触发 关闭ExecutorService,允许当前处理的任务完成但拒绝新提交的任务。
shutDownNow()也会触发 的关闭ExecutorService,但不允许当前正在执行的任务完成并尝试立即终止它们。shutDownNow()返回启动关闭时排队等待执行的任务列表。为了确保ExecutorService在所有情况下都关闭并避免潜在的资源泄漏,重要的是在 块内调用 shutDown()或。shutDownNow()finally
ExecutorService executorService = null;
try{
executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
doSomethingExpensive();
});
}
finally{
executorService.shutdown();
}总结
在这篇文章中,我们研究ExecutorService了它以及如何使用它来简化异步任务的创建和管理。这篇文章随附的源代码可在GitHub上找到,所以为什么不拉取代码并尝试一下。与往常一样,请随时在下面发表评论或问题。