1. 简介
- 异步方法调用是一种模式,在这种模式中,调用线程在等待任务结果时不会被阻塞。 该模式提供了多个独立任务的并行处理,并通过回调或等待一切完成来检索结果。
2. 列子
- 异步方法调用启动任务处理,并在任务就绪之前立即返回。 任务处理的结果稍后返回给调用方。在多线程计算机编程中,异步方法调用(AMI),也称为异步方法调用或异步模式, 是一种设计模式,在这种模式中,调用站点在等待被调用代码完成时不会被阻止。 相反,当应答到达时,调用线程会得到通知。轮询回复是一个不需要的选项。
- 在这个例子中,我们发射太空火箭并部署月球车。应用程序演示了异步方法调用模式。 该模式的关键部分是’AsyncResult’,它是异步计算值的中间容器,‘AsyncCallback’,可以提供在任务完成时执行,‘AsyncExecutor’,它管理异步任务的执行。在这个例子中,我们发射太空火箭并部署月球车
应用程序演示了异步方法调用模式。该模式的关键部分是
AsyncResult
,它是异步计算值的中间容器,
AsyncCallback
可以提供给任务完成时执行,以及管理异步任务执行的
AsyncExecutor
main方法显示了异步调用的示例流。主线程以可变的持续时间启动多个任务,然后继续自己的工作。当主线程完成它的工作时,它收集异步任务的结果。其中两个任务通过回调处理,这意味着在任务完成后立即执行回调
异步结果和回调之间线程使用情况的显著区别在于,异步结果在主线程中收集, 而回调在工作线程中执行。使用线程池时应注意这一点
Java提供了自己的异步方法调用模式实现。
- FutureTask、CompletableFuture和ExecutorService是此模式的实际实现。 但是由于并行编程的本质,实现并不是微不足道的。这个例子并没有考虑所有可能的场景,而是提供了一个简单的版本来帮助理解模式。
3. 结构图
4. 调用
package com.study111_1_async;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
*
* 异步方法调用是一种模式,在这种模式中,调用线程在等待任务结果时不会被阻塞。
* 该模式提供了多个独立任务的并行处理,并通过回调或等待一切完成来检索结果。
*
* 异步方法调用启动任务处理,并在任务>就绪之前立即返回。
* 任务处理的结果稍后返回给调用方。
*
* >在多线程计算机编程中,异步方法调用(AMI),也称为>异步方法调用或异步模式,
* 是一种设计模式,在这种模式中,调用站点>在等待被调用代码完成时不会被阻止。
* 相反,当应答到达时,调用线程会得到>通知。轮询回复是一个不需要的选项。
*
* 在这个例子中,我们发射太空火箭并部署月球车。应用程序演示了异步方法调用模式。
* 该模式的关键部分是'AsyncResult',
* 它是异步计算值的中间容器,'AsyncCallback',可以提供在任务完成时执行,
* 'AsyncExecutor',它管理异步任务的执行。
*
*
* 在这个例子中,我们发射太空火箭并部署月球车<p> 应用程序演示了异步方法调用模式。
* 该模式的关键部分是<code>AsyncResult<code>,
* 它是异步计算值的中间容器,<code>AsyncCallback<code>可以提供给任务完成时执行,以及管理异步任务执行的<code>AsyncExecutor<code>
* <p> main方法显示了异步调用的示例流。
* 主线程以可变的持续时间启动多个任务,然后继续自己的工作。当主线程完成它的工作时,它收集异步任务的结果。其中两个任务通过回调处理,这意味着在任务完成后立即执行回调<p>
* 异步结果和回调之间线程使用情况的显著区别在于,异步结果在主线程中收集,
* 而回调在工作线程中执行。使用线程池时应注意这一点<p>
* Java提供了自己的异步方法调用模式实现。FutureTask、CompletableFuture和ExecutorService是此模式的实际实现。
* 但是由于并行编程的本质,实现并不是微不足道的。这个例子并没有考虑所有可能的场景,而是提供了一个简单的版本来帮助理解模式。
*/
public class App {
private static final String ROCKET_LAUNCH_LOG_PATTERN = "火箭<%s>成功着陆";
public static void main(String[] args) throws InterruptedException, ExecutionException {
//构造将运行异步任务的新执行器
var executor = new ThreadAsyncExecutor();
//启动几个处理时间不同的异步任务,最后两个使用回调处理程序
final var asyncResult1 = executor.startProcess(lazyval(10, 500));
final var asyncResult2 = executor.startProcess(lazyval("test", 500));
final var asyncResult3 = executor.startProcess(lazyval(50L, 500));
final var asyncResult4 = executor.startProcess(lazyval(20, 500));
callback("部署月球车");
final var asyncResult5 = executor.startProcess(lazyval("callback22", 500),callback("部署月球车完成"));
//当异步任务在其自己的线程中运行时,在当前线程中模拟处理
Thread.sleep(350);
System.out.println("任务指挥部正在喝咖啡");
//等待任务完成
final var result1 = executor.endProcess(asyncResult1);
final var result2 = executor.endProcess(asyncResult2);
final var result3 = executor.endProcess(asyncResult3);
asyncResult4.await();
asyncResult5.await();
//记录任务的结果,完成后立即记录回调
System.out.println("火箭 <" + result1 + "> 发射完成");
System.out.println("火箭 <" + result2 + "> 发射完成");
System.out.println("火箭 <" + result3 + "> 发射完成");
//System.out.println("火箭 <" + result4 + "> 发射完成");
}
//创建一个可调用函数,该函数以人为延迟的方式惰性地计算给定值
private static <T> Callable<T> lazyval(T value, long delayMs) {
return () -> {
Thread.sleep(1000);
System.out.println(String.format(ROCKET_LAUNCH_LOG_PATTERN, value));
return value;
};
}
//创建一个简单回调,记录异步结果的完整状态。
private static <T> AsyncCallback<T> callback(String name) {
return (value, ex) -> {
if (ex.isPresent()) {
System.out.println(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
} else {
System.out.println(name + " <" + value + ">"+" 异步回调完成");
}
};
}
}
5. 结果
任务指挥部正在喝咖啡
火箭<20>成功着陆
火箭<50>成功着陆
火箭成功着陆
火箭<10>成功着陆
火箭成功着陆
火箭 <10> 发射完成
火箭 发射完成
火箭 <50> 发射完成
部署月球车完成 异步回调完成
Process finished with exit code 0
6. 实现
package com.study111_1_async;
import java.util.Optional;
/**
* 异步回调接口。
*/
public interface AsyncCallback<T> {
//异步任务完成或执行失败时执行的完整处理程序
void onComplete(T value, Optional<Exception> ex);
}
package com.study111_1_async;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
public interface AsyncExecutor {
//开始处理异步任务。立即返回异步结果。
<T> AsyncResult<T> startProcess(Callable<T> task);
//开始处理异步任务。立即返回异步结果。任务完成时执行回调。
<T> AsyncResult<T> startProcess(Callable<T> task,AsyncCallback<T> callback);
//结束异步任务的处理。如果需要,阻止当前线程并返回已完成任务的评估值。
<T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException,InterruptedException;
}
package com.study111_1_async;
import java.util.concurrent.ExecutionException;
/**
* 异步结果接口
*/
public interface AsyncResult<T> {
boolean isCompleted();
//获取已完成异步任务的值
T getValue() throws ExecutionException;
//阻止当前线程,直到异步任务完成。
void await() throws InterruptedException;
}
package com.study111_1_async;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 异步执行器的实现,为每个任务创建一个新线程。
*/
public class ThreadAsyncExecutor implements AsyncExecutor {
private final AtomicInteger idx = new AtomicInteger(0);
@Override
public <T> AsyncResult<T> startProcess(Callable<T> task) {
return startProcess(task, null);
}
@Override
public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
var result = new CompletableResult<>(callback);
new Thread(()->{
try {
result.setValue(task.call());
}catch (Exception e){
result.setException(e);
}
},"executor-" + idx.incrementAndGet()).start();
return result;
}
@Override
public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
if(!asyncResult.isCompleted()){
asyncResult.await();
}
return asyncResult.getValue();
}
/*
异步结果的简单实现,允许使用一个值成功完成它,或使用异常异常完成它。
一个真正简化的版本,来自它的现实生活表兄弟FutureTask和CompletableFuture
*/
private static class CompletableResult<T> implements AsyncResult<T> {
static final int RUNNING = 1;
static final int FAILED = 2;
static final int COMPLETED = 3;
final Object lock;
final Optional<AsyncCallback<T>> callback;
volatile int state = RUNNING;
T value;
Exception exception;
private CompletableResult(AsyncCallback<T> callback) {
this.lock = new Object();
this.callback = Optional.ofNullable(callback);
}
//设置成功执行的值并执行回调(如果可用)。通知任何等待完成的线程。
void setValue(T value) {
this.value = value;
this.state = COMPLETED;
this.callback.ifPresent(ac -> {
ac.onComplete(value, Optional.empty());
});
synchronized (lock) {
lock.notify();
}
}
//设置执行失败的异常并执行回调(如果可用)。通知任何等待完成的线程。
void setException(Exception exception) {
this.exception = exception;
this.state = FAILED;
this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
}
@Override
public boolean isCompleted() {
return state > RUNNING;
}
@Override
public T getValue() throws ExecutionException {
if(state == COMPLETED){
return value;
}else if(state == FAILED){
throw new ExecutionException(exception);
}else {
throw new IllegalStateException("Execution not completed yet");
}
}
@Override
public void await() throws InterruptedException {
synchronized (lock){
while (!isCompleted()){
lock.wait();
}
}
}
}
}
版权声明:本文为zzqtty原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。