设计模式 - 异步调用模式

1. 简介

  • 异步方法调用是一种模式,在这种模式中,调用线程在等待任务结果时不会被阻塞。 该模式提供了多个独立任务的并行处理,并通过回调或等待一切完成来检索结果。

2. 列子

  • 异步方法调用启动任务处理,并在任务就绪之前立即返回。 任务处理的结果稍后返回给调用方。在多线程计算机编程中,异步方法调用(AMI),也称为异步方法调用或异步模式, 是一种设计模式,在这种模式中,调用站点在等待被调用代码完成时不会被阻止。 相反,当应答到达时,调用线程会得到通知。轮询回复是一个不需要的选项。
  • 在这个例子中,我们发射太空火箭并部署月球车。应用程序演示了异步方法调用模式。 该模式的关键部分是’AsyncResult’,它是异步计算值的中间容器,‘AsyncCallback’,可以提供在任务完成时执行,‘AsyncExecutor’,它管理异步任务的执行。在这个例子中,我们发射太空火箭并部署月球车

    应用程序演示了异步方法调用模式。该模式的关键部分是AsyncResult,它是异步计算值的中间容器,AsyncCallback可以提供给任务完成时执行,以及管理异步任务执行的AsyncExecutor

    main方法显示了异步调用的示例流。主线程以可变的持续时间启动多个任务,然后继续自己的工作。当主线程完成它的工作时,它收集异步任务的结果。其中两个任务通过回调处理,这意味着在任务完成后立即执行回调

    异步结果和回调之间线程使用情况的显著区别在于,异步结果在主线程中收集, 而回调在工作线程中执行。使用线程池时应注意这一点

    Java提供了自己的异步方法调用模式实现。

  • FutureTask、CompletableFuture和ExecutorService是此模式的实际实现。 但是由于并行编程的本质,实现并不是微不足道的。这个例子并没有考虑所有可能的场景,而是提供了一个简单的版本来帮助理解模式。

3. 结构图

在这里插入图片描述

4. 调用

package com.study111_1_async;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

/**
 *
 * 异步方法调用是一种模式,在这种模式中,调用线程在等待任务结果时不会被阻塞。
 * 该模式提供了多个独立任务的并行处理,并通过回调或等待一切完成来检索结果。
 *
 * 异步方法调用启动任务处理,并在任务>就绪之前立即返回。
 * 任务处理的结果稍后返回给调用方。
 *
 * >在多线程计算机编程中,异步方法调用(AMI),也称为>异步方法调用或异步模式,
 * 是一种设计模式,在这种模式中,调用站点>在等待被调用代码完成时不会被阻止。
 * 相反,当应答到达时,调用线程会得到>通知。轮询回复是一个不需要的选项。
 *
 * 在这个例子中,我们发射太空火箭并部署月球车。应用程序演示了异步方法调用模式。
 * 该模式的关键部分是'AsyncResult',
 * 它是异步计算值的中间容器,'AsyncCallback',可以提供在任务完成时执行,
 * 'AsyncExecutor',它管理异步任务的执行。
 *
 *
 * 在这个例子中,我们发射太空火箭并部署月球车<p> 应用程序演示了异步方法调用模式。
 * 该模式的关键部分是<code>AsyncResult<code>,
 * 它是异步计算值的中间容器,<code>AsyncCallback<code>可以提供给任务完成时执行,以及管理异步任务执行的<code>AsyncExecutor<code>
 *     <p> main方法显示了异步调用的示例流。
 * 主线程以可变的持续时间启动多个任务,然后继续自己的工作。当主线程完成它的工作时,它收集异步任务的结果。其中两个任务通过回调处理,这意味着在任务完成后立即执行回调<p>
 * 异步结果和回调之间线程使用情况的显著区别在于,异步结果在主线程中收集,
 * 而回调在工作线程中执行。使用线程池时应注意这一点<p>
 *     Java提供了自己的异步方法调用模式实现。FutureTask、CompletableFuture和ExecutorService是此模式的实际实现。
 * 但是由于并行编程的本质,实现并不是微不足道的。这个例子并没有考虑所有可能的场景,而是提供了一个简单的版本来帮助理解模式。
 */


public class App {


    private static final String ROCKET_LAUNCH_LOG_PATTERN = "火箭<%s>成功着陆";

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        //构造将运行异步任务的新执行器
        var executor = new ThreadAsyncExecutor();

        //启动几个处理时间不同的异步任务,最后两个使用回调处理程序
        final var asyncResult1 = executor.startProcess(lazyval(10, 500));
        final var asyncResult2 = executor.startProcess(lazyval("test", 500));
        final var asyncResult3 = executor.startProcess(lazyval(50L, 500));
        final var asyncResult4 = executor.startProcess(lazyval(20, 500));
        callback("部署月球车");
        final var asyncResult5 = executor.startProcess(lazyval("callback22", 500),callback("部署月球车完成"));

        //当异步任务在其自己的线程中运行时,在当前线程中模拟处理
        Thread.sleep(350);
        System.out.println("任务指挥部正在喝咖啡");


        //等待任务完成
        final var result1 = executor.endProcess(asyncResult1);
        final var result2 = executor.endProcess(asyncResult2);
        final var result3 = executor.endProcess(asyncResult3);
        asyncResult4.await();
        asyncResult5.await();
        //记录任务的结果,完成后立即记录回调
        System.out.println("火箭 <" + result1 + "> 发射完成");
        System.out.println("火箭 <" + result2 + "> 发射完成");
        System.out.println("火箭 <" + result3 + "> 发射完成");
        //System.out.println("火箭 <" + result4 + "> 发射完成");

    }

    //创建一个可调用函数,该函数以人为延迟的方式惰性地计算给定值
    private static <T> Callable<T> lazyval(T value, long delayMs) {
        return () -> {
            Thread.sleep(1000);
            System.out.println(String.format(ROCKET_LAUNCH_LOG_PATTERN, value));
            return value;
        };
    }

    //创建一个简单回调,记录异步结果的完整状态。
    private static <T> AsyncCallback<T> callback(String name) {

        return (value, ex) -> {
            if (ex.isPresent()) {
                System.out.println(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
            } else {
                System.out.println(name + " <" + value + ">"+" 异步回调完成");
            }
        };
    }


}

5. 结果

任务指挥部正在喝咖啡
火箭<20>成功着陆
火箭<50>成功着陆
火箭成功着陆
火箭<10>成功着陆
火箭成功着陆
火箭 <10> 发射完成
火箭 发射完成
火箭 <50> 发射完成
部署月球车完成 异步回调完成

Process finished with exit code 0

6. 实现

package com.study111_1_async;

import java.util.Optional;

/**
 * 异步回调接口。
 */
public interface AsyncCallback<T> {



    //异步任务完成或执行失败时执行的完整处理程序
    void onComplete(T value, Optional<Exception> ex);


}

package com.study111_1_async;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

public interface AsyncExecutor {


    //开始处理异步任务。立即返回异步结果。
    <T> AsyncResult<T> startProcess(Callable<T> task);


    //开始处理异步任务。立即返回异步结果。任务完成时执行回调。
    <T> AsyncResult<T> startProcess(Callable<T> task,AsyncCallback<T> callback);

    //结束异步任务的处理。如果需要,阻止当前线程并返回已完成任务的评估值。
    <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException,InterruptedException;


}


package com.study111_1_async;

import java.util.concurrent.ExecutionException;

/**
 * 异步结果接口
 */
public interface AsyncResult<T> {


    boolean isCompleted();

    //获取已完成异步任务的值
    T getValue() throws ExecutionException;

    //阻止当前线程,直到异步任务完成。
    void await() throws InterruptedException;

}


package com.study111_1_async;

import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 异步执行器的实现,为每个任务创建一个新线程。
 */
public class ThreadAsyncExecutor implements AsyncExecutor {

    private final AtomicInteger idx = new AtomicInteger(0);

    @Override
    public <T> AsyncResult<T> startProcess(Callable<T> task) {
        return startProcess(task, null);
    }

    @Override
    public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {

        var result = new CompletableResult<>(callback);
        new Thread(()->{
            try {
                result.setValue(task.call());
            }catch (Exception e){
                result.setException(e);
            }
        },"executor-" + idx.incrementAndGet()).start();
        return result;
    }

    @Override
    public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
        if(!asyncResult.isCompleted()){
            asyncResult.await();
        }

        return asyncResult.getValue();
    }

    /*
    异步结果的简单实现,允许使用一个值成功完成它,或使用异常异常完成它。
    一个真正简化的版本,来自它的现实生活表兄弟FutureTask和CompletableFuture
     */

    private static class CompletableResult<T> implements AsyncResult<T> {


        static final int RUNNING = 1;
        static final int FAILED = 2;
        static final int COMPLETED = 3;

        final Object lock;
        final Optional<AsyncCallback<T>> callback;

        volatile int state = RUNNING;
        T value;
        Exception exception;

        private CompletableResult(AsyncCallback<T> callback) {
            this.lock = new Object();
            this.callback = Optional.ofNullable(callback);
        }

        //设置成功执行的值并执行回调(如果可用)。通知任何等待完成的线程。
        void setValue(T value) {
            this.value = value;
            this.state = COMPLETED;
            this.callback.ifPresent(ac -> {
                ac.onComplete(value, Optional.empty());
            });
            synchronized (lock) {
                lock.notify();
            }

        }

        //设置执行失败的异常并执行回调(如果可用)。通知任何等待完成的线程。
        void setException(Exception exception) {
            this.exception = exception;
            this.state = FAILED;

            this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));

        }




        @Override
        public boolean isCompleted() {
            return state > RUNNING;
        }

        @Override
        public T getValue() throws ExecutionException {

            if(state == COMPLETED){
                return value;
            }else if(state == FAILED){
                throw new ExecutionException(exception);
            }else {
                throw new IllegalStateException("Execution not completed yet");
            }
        }

        @Override
        public void await() throws InterruptedException {
            synchronized (lock){
                while (!isCompleted()){
                    lock.wait();
                }
            }
        }
    }
}



版权声明:本文为zzqtty原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。