CompletableFuture和ThreadPoolExecutor实现异步编程

首先我们需要自己创建线程池,目前使用的是ThreadPoolExecutor来自定义核心线程数的定义,以此来满足自己所在服务器的硬件配置

  ExecutorService service = new ThreadPoolExecutor(
                6,
                20,
                10, 
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1024),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

结合场景来进行分析

首先定义个一个小工具类,主要输出线程信息和休眠

package com.example.kafkafp.controller.listTest;

import java.util.StringJoiner;

/**
 * 小工具
 */
public class SmallTool {

    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }

}

一.场景1

1⃣️小李进快餐店点餐,点了一份菜和一份米饭

2⃣️厨师炒菜,之后厨师打饭

package com.example.kafkafp.controller.listTest._03_CompletableFuture_start;
import com.example.kafkafp.controller.listTest.SmallTool;

import java.util.concurrent.*;

public class _01_supplyAsync {
    public static void main(String[] args) {
        //自定义一个线程池
//        ExecutorService service = Executors.newFixedThreadPool(10);
        ExecutorService service = new ThreadPoolExecutor(
                6,
                20,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1024),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        SmallTool.printTimeAndThread("小李进入餐厅");
        SmallTool.printTimeAndThread("小李点了 番茄炒蛋 + 一碗米饭");
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("厨师炒菜");
            SmallTool.sleepMillis(200);
            SmallTool.printTimeAndThread("厨师打饭");
            SmallTool.sleepMillis(100);
            return "番茄炒蛋 + 米饭做好了";
        },service);

        SmallTool.printTimeAndThread("小李在打王者");
        SmallTool.printTimeAndThread(String.format("%s,小李开吃", cf1.join()));
    }
}

使用supplyAsync方法即可操作完成

场景二:

厨师炒菜

而打饭太简单了,就让服务员去打饭了

package com.example.kafkafp.controller.listTest._03_CompletableFuture_start;



import com.example.kafkafp.controller.listTest.SmallTool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class _02_thenCompose {
    public static void main(String[] args) {
        //自定义一个线程池
        ExecutorService service = Executors.newFixedThreadPool(10);
        SmallTool.printTimeAndThread("小白进入餐厅");
        SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("厨师炒菜");
            SmallTool.sleepMillis(200);
            return "番茄炒蛋";
        },service).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("服务员打饭");
            SmallTool.sleepMillis(100);
            return dish + " + 米饭";
        },service));

        SmallTool.printTimeAndThread("小白在打王者");
        SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));
    }

    /**
     * 用 applyAsync 也能实现
     */
//    private static void applyAsync() {
//        SmallTool.printTimeAndThread("小白进入餐厅");
//        SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
//
//        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
//            SmallTool.printTimeAndThread("厨师炒菜");
//            SmallTool.sleepMillis(200);
//            CompletableFuture<String> race = CompletableFuture.supplyAsync(() -> {
//                SmallTool.printTimeAndThread("服务员打饭");
//                SmallTool.sleepMillis(100);
//                return " + 米饭";
//            });
//            return "番茄炒蛋" + race.join();
//        });
//
//        SmallTool.printTimeAndThread("小白在打王者");
//        SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));
//    }
}

场景三

首先需要厨师打饭和服务员蒸饭,这两样都完成了之后才能进行服务员打饭

package com.example.kafkafp.controller.listTest._03_CompletableFuture_start;



import com.example.kafkafp.controller.listTest.SmallTool;

import java.util.concurrent.*;

public class _03_thenCombine {
    public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(6, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//        CompletableFuture的使用
        SmallTool.printTimeAndThread("小白进入餐厅");
        SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("厨师炒菜");
            SmallTool.sleepMillis(200);
            return "番茄炒蛋";
        }, service).thenCombine(CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("服务员蒸饭");
            SmallTool.sleepMillis(300);
            return "米饭";
        }, service), (dish, rice) -> {
            SmallTool.printTimeAndThread("服务员打饭");
            SmallTool.sleepMillis(100);
            return String.format("%s + %s 好了", dish, rice);
        });
//                .whenComplete((r, e) -> {
            whenComplete第一个参数是结果,第二个参数是异常,他可以感知异常,无法返回默认数据
//            System.out.println("执行完毕,结果是---" + r + "异常是----" + e);
//        }).exceptionally(u -> {
            exceptionally只有一个参数是异常类型,他可以感知异常,同时返回默认数据10
//            return "bb";
//        });


        SmallTool.printTimeAndThread("小白在打王者");
        SmallTool.printTimeAndThread(String.format("%s ,小白开吃", cf1.join()));

    }


    /**
     * 用 applyAsync 也能实现
     */
    private static void applyAsync() {
        SmallTool.printTimeAndThread("小白进入餐厅");
        SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("厨师炒菜");
            SmallTool.sleepMillis(200);
            return "番茄炒蛋";
        });
        CompletableFuture<String> race = CompletableFuture.supplyAsync(() -> {
            SmallTool.printTimeAndThread("服务员蒸饭");
            SmallTool.sleepMillis(300);
            return "米饭";
        });
        SmallTool.printTimeAndThread("小白在打王者");

        String result = String.format("%s + %s 好了", cf1.join(), race.join());
        SmallTool.printTimeAndThread("服务员打饭");
        SmallTool.sleepMillis(100);

        SmallTool.printTimeAndThread(String.format("%s ,小白开吃", result));
    }
}

本文来自了blibli的灰灰视频,本次只是记录方便自己后面使用的时候方便实现


版权声明:本文为weixin_40593587原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。