在编程中,根据实际场景,我们有时会考虑使用异步执行来提高应用的响应速度;一个简单的例子:
@Test
public void futureTest() {
// 注意使用 ExecutorService 而非 Executor
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future future = executorService.submit(() -> {
//此处为Callable接口实现
// 业务代码
System.out.println("执行异步任务...");
return "返回执行结果";
});
try {
//判断异步线程是否执行完成
if (future.isDone())
System.out.println("线程执行完成1");
//获取异步线程的执行结果,若未完成将阻塞
String result = future.get();
System.out.println(result);
if (future.isDone())
System.out.println("线程执行完成2");
} catch (Exception e) {
}
}
执行结果:
执行异步任务...
返回执行结果
线程执行完成2
一.Runnable与Callable接口
多线程执行的任务,常用的Runnable接口来编写多线程任务,而对于某些计算类任务需要得到计算结果则有Callable接口;Runnable实现可以通过Thread类执行或调用线程池execute方法执行,而Callable实现只能通过线程池的submit方法执行,下面是例子:
@Test
public void currentTest() {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("执行任务1...");
}
};
//泛型代表了任务执行的返回类型
Callable callable = new Callable() {
@Override
public String call() throws Exception {
System.out.println("执行计算任务2...");
return "任务2结果";
}
};
ExecutorService service = Executors.newFixedThreadPool(2);
//Runnable任务使用execute方法
service.execute(runnable);
//Callable任务使用submit方法
Future future = service.submit(callable);
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
}
}
通过Future实例可以获取任务执行结果,Future的用法后面会说到
执行结果:
执行任务1...
执行计算任务2...
任务2结果
有时候对于Runnable实现类,我们想要通过线程池的submit方法获取该任务的执行状态,并且不想更改该实现类,可以使用Executors.callable(runnable, "执行结果")将Runnable转化为Callable,这里为适配器模式的实际应用之处,不过由于任务本身是Runnable实现,并不会返回执行结果,故设置固定的执行结果,或者使用线程池ExecutorService.submit(runnable, "执行结果")直接获取Future对象;
@Test
public void runnableAdapt() {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("执行Runnable任务...");
}
};
ExecutorService service = Executors.newFixedThreadPool(1);
//使用适配器模式,将Runnable转化为Callable引用
Callable callable = Executors.callable(runnable, "执行结果1");
//内部也是使用的适配器
Future future1 = service.submit(runnable, "执行结果2");
Future future2 = service.submit(callable);
try {
System.out.println(future1.get());
System.out.println(future2.get());
} catch (InterruptedException | ExecutionException e) {
}
}
执行结果:
执行Runnable任务...
执行结果2
执行Runnable任务...
执行结果1
二.Future接口
对于异步执行的任务,我们有时需要知道其执行的状态;是否完成,是否成功,异步计算的结果等;JUC包中提供了Future接口可以满足这一需求;ExecutorService.submit方法会返回一个Future示例对象,其泛型同其参数Callable的泛型,代表任务执行的结果类型;
在实际使用中,如kafka生产者提交消息得到的Future对象
Future future = producer.send(new ProducerRecord(topic, msg));
try {
RecordMetadata metadata = future.get();
System.out.println(metadata);
} catch (Exception e) {
//
}
netty中向连接写数据
//此处的Future为netty对JUC中Future的拓展,具体见netty的api
Future future = ctx.writeAndFlush(msg);
future.addListener(new GenericFutureListener>() {
@Override
public void operationComplete(Future> future) throws Exception {
}
});
java.util.concurrent.Future
1.boolean cancel(boolean mayInterruptIfRunning);
对于还未完成或撤销的任务,可以调用次方法将其撤销,成功撤销返回true,已完成,已撤销或某一未知因素撤销失败返回false;
2.boolean isCancelled();
任务是否已被撤销,如方法名所示很好理解,任务完成前被撤销则返回true;
3.boolean isDone();
任务是否已完成,因为此方法不会导致阻塞,配合get()使用可以一次性尝试获取异步任务的结果
if (future.isDone()) {
String result = future.get();
System.out.println(result);
}
4.V get() throws InterruptedException, ExecutionException;
获取异步任务执行结果,若任务未完成,改方法会导致阻塞知道任务完成或抛出异常;返回值泛型与任务Callable实现类泛型相同,若任务为无返回值的Runnable实现,则获取到的为null;
5.V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException;
同上,区别是可传入参数设置线程阻塞等待的时间
三.FutureTask类
每一个异步任务都对应一个Future对象,那么为何不将其组合在一起简化引用呢?JUC包中提供类FutureTask类,该类实现类RunnableFuture接口,而RunnableFuture接口又继承了Runnable接口与Future接口,所有一个FutureTask实例可以认为同时包含了一个任务及该任务的状态;下面是一个简单的例子:
@Test
public void futureTaskTest() {
FutureTask task = new FutureTask<>(() -> {
//Callable实现
System.out.println("执行异步任务1...");
return "执行成功1";
});
FutureTask task2 = new FutureTask<>(() -> {
//Runnable实现与固定返回值
System.out.println("执行异步任务2...");
}, "执行成功2");
Thread t = new Thread(task);
t.start();
Thread t2 = new Thread(task2);
t2.start();
try {
System.out.println(task.get());
} catch (Exception e) {
//
}
}
执行异步任务1...
执行成功1
执行异步任务2...
FutureTask对象通过传入Callable实现或Runnable实现与固定返回值构造,其本身为一个任务,同时又具有Future接口的所有功能可以得到该任务的执行状态;
四.CountDownLatch类
等待另一个线程执行完成可以使用Future.get(),那么如果需要等待是多个线程呢,一个个轮询判断显然不妥,而JUC包中提供了CountDownLatch类,借助该类可以实现线程等待其它线程完成一系列执行,下面借用JDK中的例子添加注释来介绍:
主线程中执行的驱动类
class Driver {
/**
* 主要使用的有两个方法countDown()与await(),构造方法传入int参数N,
* 调用await()方法的线程会进入阻塞等待状态,直到有N(构造方法的数量)个线程调用countDown()
* 方法才会结束等待;
*/
void main() throws InterruptedException {
//开始标志,参数表示需要等待的线程数,工作线程等待一个主线程发出开始信号,故这里为1
CountDownLatch startSignal = new CountDownLatch(1);
//工作线程完成标志,假设有N个工作线程,那么主线程需要等待N个线程完成,参数传N
CountDownLatch doneSignal = new CountDownLatch(N);
//创建并启动N个工作线程
for (int i = 0; i < N; ++i)
new Thread(new Worker(startSignal, doneSignal)).start();
//执行其他业务代码
//doSomethingElse();
/**
* 开始工作标志,startSignal的初始等待数为1,只要有一个线程调用countDown()方法,
* 则所有调用startSignal.await()方法的线程便会停止等待
*/
startSignal.countDown();
//doSomethingElse();
/**
* 等待所有工作线程完成完成,doneSignal初始等待数为N,需要有N个线程调用
* doneSignal.countDown()方法才会结束等待
*/
doneSignal.await();
}
}
工作线程工作任务
class Worker implements Runnable {
//持有两个信号的引用
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
/**
* 为了不让线程启动便进入工作,需要等待主线程的通知,调用通知对象的await()方法,
* 上边我们设置了该对象的等待数为1,
* CountDownLatch startSignal = new CountDownLatch(1);
*/
startSignal.await();
//等待结束开始工作
doWork();
/**
* 工作完成,此时主线程还处于等待状态,调用countDown()通知主线程,
* 由于我们初始设置了等待数为N,CountDownLatch doneSignal = new CountDownLatch(N),
* 故需要所有N个工作线程调用doneSignal.countDown()方法主线程才会结束等待
*/
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() {
//工作内容
}
}
await(long timeout, TimeUnit unit)
该方法通await()方法,不过可以设置等待时间
getCount()
获取剩余的等待数量,用于调试与测试
@Test
public void countDownLatchTest() {
CountDownLatch cdl = new CountDownLatch(5);
System.out.println(cdl.getCount());
cdl.countDown();
System.out.println(cdl.getCount());
}
5
4