java 启动异步线程_java多线程之异步使用

在编程中,根据实际场景,我们有时会考虑使用异步执行来提高应用的响应速度;一个简单的例子:

@Test

public void futureTest() {

// 注意使用 ExecutorService 而非 Executor

ExecutorService executorService = Executors.newFixedThreadPool(1);

Future future = executorService.submit(() -> {

//此处为Callable接口实现

// 业务代码

System.out.println("执行异步任务...");

return "返回执行结果";

});

try {

//判断异步线程是否执行完成

if (future.isDone())

System.out.println("线程执行完成1");

//获取异步线程的执行结果,若未完成将阻塞

String result = future.get();

System.out.println(result);

if (future.isDone())

System.out.println("线程执行完成2");

} catch (Exception e) {

}

}

执行结果:

执行异步任务...

返回执行结果

线程执行完成2

一.Runnable与Callable接口

多线程执行的任务,常用的Runnable接口来编写多线程任务,而对于某些计算类任务需要得到计算结果则有Callable接口;Runnable实现可以通过Thread类执行或调用线程池execute方法执行,而Callable实现只能通过线程池的submit方法执行,下面是例子:

@Test

public void currentTest() {

Runnable runnable = new Runnable() {

@Override

public void run() {

System.out.println("执行任务1...");

}

};

//泛型代表了任务执行的返回类型

Callable callable = new Callable() {

@Override

public String call() throws Exception {

System.out.println("执行计算任务2...");

return "任务2结果";

}

};

ExecutorService service = Executors.newFixedThreadPool(2);

//Runnable任务使用execute方法

service.execute(runnable);

//Callable任务使用submit方法

Future future = service.submit(callable);

try {

System.out.println(future.get());

} catch (InterruptedException | ExecutionException e) {

}

}

通过Future实例可以获取任务执行结果,Future的用法后面会说到

执行结果:

执行任务1...

执行计算任务2...

任务2结果

有时候对于Runnable实现类,我们想要通过线程池的submit方法获取该任务的执行状态,并且不想更改该实现类,可以使用Executors.callable(runnable, "执行结果")将Runnable转化为Callable,这里为适配器模式的实际应用之处,不过由于任务本身是Runnable实现,并不会返回执行结果,故设置固定的执行结果,或者使用线程池ExecutorService.submit(runnable, "执行结果")直接获取Future对象;

@Test

public void runnableAdapt() {

Runnable runnable = new Runnable() {

@Override

public void run() {

System.out.println("执行Runnable任务...");

}

};

ExecutorService service = Executors.newFixedThreadPool(1);

//使用适配器模式,将Runnable转化为Callable引用

Callable callable = Executors.callable(runnable, "执行结果1");

//内部也是使用的适配器

Future future1 = service.submit(runnable, "执行结果2");

Future future2 = service.submit(callable);

try {

System.out.println(future1.get());

System.out.println(future2.get());

} catch (InterruptedException | ExecutionException e) {

}

}

执行结果:

执行Runnable任务...

执行结果2

执行Runnable任务...

执行结果1

二.Future接口

对于异步执行的任务,我们有时需要知道其执行的状态;是否完成,是否成功,异步计算的结果等;JUC包中提供了Future接口可以满足这一需求;ExecutorService.submit方法会返回一个Future示例对象,其泛型同其参数Callable的泛型,代表任务执行的结果类型;

在实际使用中,如kafka生产者提交消息得到的Future对象

Future future = producer.send(new ProducerRecord(topic, msg));

try {

RecordMetadata metadata = future.get();

System.out.println(metadata);

} catch (Exception e) {

//

}

netty中向连接写数据

//此处的Future为netty对JUC中Future的拓展,具体见netty的api

Future future = ctx.writeAndFlush(msg);

future.addListener(new GenericFutureListener>() {

@Override

public void operationComplete(Future> future) throws Exception {

}

});

0240b08c0d60?from=timeline

java.util.concurrent.Future

1.boolean cancel(boolean mayInterruptIfRunning);

对于还未完成或撤销的任务,可以调用次方法将其撤销,成功撤销返回true,已完成,已撤销或某一未知因素撤销失败返回false;

2.boolean isCancelled();

任务是否已被撤销,如方法名所示很好理解,任务完成前被撤销则返回true;

3.boolean isDone();

任务是否已完成,因为此方法不会导致阻塞,配合get()使用可以一次性尝试获取异步任务的结果

if (future.isDone()) {

String result = future.get();

System.out.println(result);

}

4.V get() throws InterruptedException, ExecutionException;

获取异步任务执行结果,若任务未完成,改方法会导致阻塞知道任务完成或抛出异常;返回值泛型与任务Callable实现类泛型相同,若任务为无返回值的Runnable实现,则获取到的为null;

5.V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException;

同上,区别是可传入参数设置线程阻塞等待的时间

三.FutureTask类

每一个异步任务都对应一个Future对象,那么为何不将其组合在一起简化引用呢?JUC包中提供类FutureTask类,该类实现类RunnableFuture接口,而RunnableFuture接口又继承了Runnable接口与Future接口,所有一个FutureTask实例可以认为同时包含了一个任务及该任务的状态;下面是一个简单的例子:

@Test

public void futureTaskTest() {

FutureTask task = new FutureTask<>(() -> {

//Callable实现

System.out.println("执行异步任务1...");

return "执行成功1";

});

FutureTask task2 = new FutureTask<>(() -> {

//Runnable实现与固定返回值

System.out.println("执行异步任务2...");

}, "执行成功2");

Thread t = new Thread(task);

t.start();

Thread t2 = new Thread(task2);

t2.start();

try {

System.out.println(task.get());

} catch (Exception e) {

//

}

}

执行异步任务1...

执行成功1

执行异步任务2...

FutureTask对象通过传入Callable实现或Runnable实现与固定返回值构造,其本身为一个任务,同时又具有Future接口的所有功能可以得到该任务的执行状态;

四.CountDownLatch类

等待另一个线程执行完成可以使用Future.get(),那么如果需要等待是多个线程呢,一个个轮询判断显然不妥,而JUC包中提供了CountDownLatch类,借助该类可以实现线程等待其它线程完成一系列执行,下面借用JDK中的例子添加注释来介绍:

主线程中执行的驱动类

class Driver {

/**

* 主要使用的有两个方法countDown()与await(),构造方法传入int参数N,

* 调用await()方法的线程会进入阻塞等待状态,直到有N(构造方法的数量)个线程调用countDown()

* 方法才会结束等待;

*/

void main() throws InterruptedException {

//开始标志,参数表示需要等待的线程数,工作线程等待一个主线程发出开始信号,故这里为1

CountDownLatch startSignal = new CountDownLatch(1);

//工作线程完成标志,假设有N个工作线程,那么主线程需要等待N个线程完成,参数传N

CountDownLatch doneSignal = new CountDownLatch(N);

//创建并启动N个工作线程

for (int i = 0; i < N; ++i)

new Thread(new Worker(startSignal, doneSignal)).start();

//执行其他业务代码

//doSomethingElse();

/**

* 开始工作标志,startSignal的初始等待数为1,只要有一个线程调用countDown()方法,

* 则所有调用startSignal.await()方法的线程便会停止等待

*/

startSignal.countDown();

//doSomethingElse();

/**

* 等待所有工作线程完成完成,doneSignal初始等待数为N,需要有N个线程调用

* doneSignal.countDown()方法才会结束等待

*/

doneSignal.await();

}

}

工作线程工作任务

class Worker implements Runnable {

//持有两个信号的引用

private final CountDownLatch startSignal;

private final CountDownLatch doneSignal;

Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {

this.startSignal = startSignal;

this.doneSignal = doneSignal;

}

public void run() {

try {

/**

* 为了不让线程启动便进入工作,需要等待主线程的通知,调用通知对象的await()方法,

* 上边我们设置了该对象的等待数为1,

* CountDownLatch startSignal = new CountDownLatch(1);

*/

startSignal.await();

//等待结束开始工作

doWork();

/**

* 工作完成,此时主线程还处于等待状态,调用countDown()通知主线程,

* 由于我们初始设置了等待数为N,CountDownLatch doneSignal = new CountDownLatch(N),

* 故需要所有N个工作线程调用doneSignal.countDown()方法主线程才会结束等待

*/

doneSignal.countDown();

} catch (InterruptedException ex) {

} // return;

}

void doWork() {

//工作内容

}

}

await(long timeout, TimeUnit unit)

该方法通await()方法,不过可以设置等待时间

getCount()

获取剩余的等待数量,用于调试与测试

@Test

public void countDownLatchTest() {

CountDownLatch cdl = new CountDownLatch(5);

System.out.println(cdl.getCount());

cdl.countDown();

System.out.println(cdl.getCount());

}

5

4


版权声明:本文为weixin_32288959原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。