异步编程CompletableFuture

异步编程CompletableFuture

package com.huawei.util;

import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 异步编程  所谓异步其实就是实现一个无需等待被调用函数的返回值而让操作继续运行的方法
 */
public class AsyncTask {

    /**
     * 无返回值
     *
     */
    @Test
    public void test1() {
        CompletableFuture noReturn = CompletableFuture.runAsync(() -> {
            //执行逻辑,无返回值
            System.out.println(1);
        });
    }

    /**
     * 有返回值
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> hasReturn = CompletableFuture.supplyAsync(AsyncTask::get);
        System.out.println(hasReturn.get());
    }

    private static String get() {
        System.out.println("Begin Invoke getFuntureHasReturnLambda");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {

        }
        System.out.println("End Invoke getFuntureHasReturnLambda");
        return "hasReturnLambda";
    }

    /**
     * 自定义返回值  我们可以发现输出是新起线程的输出值,当然这是因为我们的异步方法设置了等待10秒,如果此时异步方法等待1秒,新起的线程等待10秒,那么输出的值就是异步方法中的值了。
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> funtureHasReturnLambda = CompletableFuture.supplyAsync(AsyncTask::get);
        System.out.println("Main Method Is Invoking");
        new Thread(() -> {
            System.out.println("Thread Is Invoking ");
            try {
                Thread.sleep(10000);
                funtureHasReturnLambda.complete("custome value");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread End ");
        }).run();
        String value = funtureHasReturnLambda.get();
        System.out.println("Main Method End value is " + value);
    }

    /**
     * 按顺序执行异步任务
     */
    @Test
    public void test4() throws ExecutionException, InterruptedException {
        //thenApply  可获取到前一个任务的返回值,也有返回值
        CompletableFuture<String> seqFutureOne = CompletableFuture.supplyAsync(() -> "seqFutureOne");
        CompletableFuture<String> seqFutureTwo = seqFutureOne.thenApply(name -> name + " seqFutureTwo");
        System.out.println(seqFutureTwo.get());


        //thenAccept  可获取到前一个任务的返回值,但是无返回值
        CompletableFuture<Void> thenAccept = seqFutureOne
                .thenAccept(name -> System.out.println(name + " thenAccept"));
        System.out.println("-------------");
        System.out.println(thenAccept.get());

        //thenRun 获取不到前一个任务的返回值,也无返回值
        System.out.println("-------------");
        CompletableFuture<Void> thenRun = seqFutureOne.thenRun(() -> {
            System.out.println(" thenRun");
        });
        System.out.println(thenRun.get());
    }

    /**
     * thenApply和thenApplyAsync的区别
     */
    @Test
    public void test5() throws ExecutionException, InterruptedException {
        //thenApply和thenApplyAsync的区别 可以看到
        // supplyAsync方法执行速度慢的话thenApply方法执行线程和supplyAsync执行线程相同,
        // 如果supplyAsync方法执行速度快的话,那么thenApply方法执行线程和Main方法执行线程相同。
        System.out.println("-------------");
        CompletableFuture<String> supplyAsyncWithSleep = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "supplyAsyncWithSleep Thread Id : " + Thread.currentThread();
        });
        CompletableFuture<String> thenApply = supplyAsyncWithSleep
                .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
        CompletableFuture<String> thenApplyAsync = supplyAsyncWithSleep
                .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
        System.out.println("Main Thread Id: " + Thread.currentThread());
        System.out.println(thenApply.get());
        System.out.println(thenApplyAsync.get());


        System.out.println("-------------No Sleep");
        CompletableFuture<String> supplyAsyncNoSleep = CompletableFuture.supplyAsync(() -> {
            return "supplyAsyncNoSleep Thread Id : " + Thread.currentThread();
        });
        CompletableFuture<String> thenApplyNoSleep = supplyAsyncNoSleep
                .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
        CompletableFuture<String> thenApplyAsyncNoSleep = supplyAsyncNoSleep
                .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
        System.out.println("Main Thread Id: " + Thread.currentThread());
        System.out.println(thenApplyNoSleep.get());
        System.out.println(thenApplyAsyncNoSleep.get());
    }

    // 组合CompletableFuture
    //将两个CompletableFuture组合到一起有两个方法
    //thenCompose():当第一个任务完成时才会执行第二个操作
    //thenCombine():两个异步任务全部完成时才会执行某些操作

    /**
     * thenCompose() 用法
     * 我们定义两个异步任务,假设第二个定时任务需要用到第一个定时任务的返回值。
     */
    @Test
    public void test6() throws ExecutionException, InterruptedException {
        CompletableFuture<String> thenComposeComplet = getTastOne().thenCompose(s -> getTastTwo(s));
        System.out.println(thenComposeComplet.get());
    }

    public static CompletableFuture<String> getTastOne() {
        return CompletableFuture.supplyAsync(() -> "topOne");
    }

    public static CompletableFuture<String> getTastTwo(String s) {
        return CompletableFuture.supplyAsync(() -> s + "  topTwo");
    }

    /**
     * 计算两个异步方法返回值的和 例如我们此时需要计算两个异步方法返回值的和。求和这个操作是必须是两个异步方法得出来值的情况下才能进行计算,因此我们可以用thenCombine()方法进行计算。
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void test7() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
        CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
        CompletableFuture<Integer> thenComposeCount = thenComposeOne
                .thenCombine(thenComposeTwo, (s, y) -> s + y);
        System.out.println(thenComposeCount.get());
    }

    /**
     * 组合多个CompletableFuture  在上面我们用thenCompose()和thenCombine()两个方法将两个CompletableFuture组装起来,如果我们想要将任意数量的CompletableFuture组合起来呢?可以使用下面两个方法进行组合。
     * allOf():等待所有CompletableFuture完后以后才会运行回调函数
     * anyOf():只要其中一个CompletableFuture完成,那么就会执行回调函数。注意此时其他的任务也就不执行了。
     */
    @Test
    public void test8() throws ExecutionException, InterruptedException {
        //allOf()
        CompletableFuture<Integer> one = CompletableFuture.supplyAsync(() -> 1);
        CompletableFuture<Integer> two = CompletableFuture.supplyAsync(() -> 2);
        CompletableFuture<Integer> three = CompletableFuture.supplyAsync(() -> 3);
        CompletableFuture<Integer> four = CompletableFuture.supplyAsync(() -> 4);
        CompletableFuture<Integer> five = CompletableFuture.supplyAsync(() -> 5);
        CompletableFuture<Integer> six = CompletableFuture.supplyAsync(() -> 6);

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(one, two, three, four, five, six);
        voidCompletableFuture.thenApply(v -> Stream.of(one, two, three, four, five, six)
                .map(CompletableFuture::join)
                .collect(Collectors.toList())).thenAccept(System.out::println);

        CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {

            }
            System.out.println("1");
        });
        voidCompletableFuture1.get();
    }

    /**
     * anyOf()
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void test9() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(10000);
            } catch (Exception e) {

            }
            System.out.println("voidCompletableFuture1");
        });

        CompletableFuture<Void> voidCompletableFutur2 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (Exception e) {

            }
            System.out.println("voidCompletableFutur2");
        });

        CompletableFuture<Void> voidCompletableFuture3 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (Exception e) {

            }
            System.out.println("voidCompletableFuture3");
        });

        CompletableFuture<Object> objectCompletableFuture = CompletableFuture
                .anyOf(voidCompletableFuture1, voidCompletableFutur2, voidCompletableFuture3);
        objectCompletableFuture.get();
    }

    /**
     * exceptionally()
     */
    @Test
    public void test10() throws ExecutionException, InterruptedException {
        //处理异常

        CompletableFuture<String> exceptionally = CompletableFuture.supplyAsync(() -> {
            //发生异常
            int i = 10 / 0;
            return "Success";
        }).exceptionally(e -> {
            System.out.println(e);
            return "Exception has Handl";
        });
        System.out.println(exceptionally.get());
    }

    /**
     * handle()  调用handle()方法也能够捕捉到异常并且自定义返回值,他和exceptionally()方法不同一点是handle()方法无论发没发生异常都会被调用
     */
    @Test
    public void test11() {
        System.out.println("-------有异常-------");
        CompletableFuture.supplyAsync(() -> {
            //发生异常
            int i = 10 / 0;
            return "Success";
        }).handle((response, e) -> {
            System.out.println("Exception:" + e);
            System.out.println("Response:" + response);
            return response;
        });

        System.out.println("-------无异常-------");
        CompletableFuture.supplyAsync(() -> {
            return "Sucess";
        }).handle((response, e) -> {
            System.out.println("Exception:" + e);
            System.out.println("Response:" + response);
            return response;
        });
    }
}

版权声明:本文为tmac937436原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。