首先我们需要自己创建线程池,目前使用的是ThreadPoolExecutor来自定义核心线程数的定义,以此来满足自己所在服务器的硬件配置
ExecutorService service = new ThreadPoolExecutor(
6,
20,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
结合场景来进行分析
首先定义个一个小工具类,主要输出线程信息和休眠
package com.example.kafkafp.controller.listTest;
import java.util.StringJoiner;
/**
* 小工具
*/
public class SmallTool {
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
}
一.场景1
1⃣️小李进快餐店点餐,点了一份菜和一份米饭
2⃣️厨师炒菜,之后厨师打饭
package com.example.kafkafp.controller.listTest._03_CompletableFuture_start;
import com.example.kafkafp.controller.listTest.SmallTool;
import java.util.concurrent.*;
public class _01_supplyAsync {
public static void main(String[] args) {
//自定义一个线程池
// ExecutorService service = Executors.newFixedThreadPool(10);
ExecutorService service = new ThreadPoolExecutor(
6,
20,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
SmallTool.printTimeAndThread("小李进入餐厅");
SmallTool.printTimeAndThread("小李点了 番茄炒蛋 + 一碗米饭");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师炒菜");
SmallTool.sleepMillis(200);
SmallTool.printTimeAndThread("厨师打饭");
SmallTool.sleepMillis(100);
return "番茄炒蛋 + 米饭做好了";
},service);
SmallTool.printTimeAndThread("小李在打王者");
SmallTool.printTimeAndThread(String.format("%s,小李开吃", cf1.join()));
}
}
使用supplyAsync方法即可操作完成
场景二:
厨师炒菜
而打饭太简单了,就让服务员去打饭了
package com.example.kafkafp.controller.listTest._03_CompletableFuture_start;
import com.example.kafkafp.controller.listTest.SmallTool;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class _02_thenCompose {
public static void main(String[] args) {
//自定义一个线程池
ExecutorService service = Executors.newFixedThreadPool(10);
SmallTool.printTimeAndThread("小白进入餐厅");
SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师炒菜");
SmallTool.sleepMillis(200);
return "番茄炒蛋";
},service).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员打饭");
SmallTool.sleepMillis(100);
return dish + " + 米饭";
},service));
SmallTool.printTimeAndThread("小白在打王者");
SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));
}
/**
* 用 applyAsync 也能实现
*/
// private static void applyAsync() {
// SmallTool.printTimeAndThread("小白进入餐厅");
// SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
//
// CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
// SmallTool.printTimeAndThread("厨师炒菜");
// SmallTool.sleepMillis(200);
// CompletableFuture<String> race = CompletableFuture.supplyAsync(() -> {
// SmallTool.printTimeAndThread("服务员打饭");
// SmallTool.sleepMillis(100);
// return " + 米饭";
// });
// return "番茄炒蛋" + race.join();
// });
//
// SmallTool.printTimeAndThread("小白在打王者");
// SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));
// }
}
场景三
首先需要厨师打饭和服务员蒸饭,这两样都完成了之后才能进行服务员打饭
package com.example.kafkafp.controller.listTest._03_CompletableFuture_start;
import com.example.kafkafp.controller.listTest.SmallTool;
import java.util.concurrent.*;
public class _03_thenCombine {
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(6, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
// CompletableFuture的使用
SmallTool.printTimeAndThread("小白进入餐厅");
SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师炒菜");
SmallTool.sleepMillis(200);
return "番茄炒蛋";
}, service).thenCombine(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员蒸饭");
SmallTool.sleepMillis(300);
return "米饭";
}, service), (dish, rice) -> {
SmallTool.printTimeAndThread("服务员打饭");
SmallTool.sleepMillis(100);
return String.format("%s + %s 好了", dish, rice);
});
// .whenComplete((r, e) -> {
whenComplete第一个参数是结果,第二个参数是异常,他可以感知异常,无法返回默认数据
// System.out.println("执行完毕,结果是---" + r + "异常是----" + e);
// }).exceptionally(u -> {
exceptionally只有一个参数是异常类型,他可以感知异常,同时返回默认数据10
// return "bb";
// });
SmallTool.printTimeAndThread("小白在打王者");
SmallTool.printTimeAndThread(String.format("%s ,小白开吃", cf1.join()));
}
/**
* 用 applyAsync 也能实现
*/
private static void applyAsync() {
SmallTool.printTimeAndThread("小白进入餐厅");
SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师炒菜");
SmallTool.sleepMillis(200);
return "番茄炒蛋";
});
CompletableFuture<String> race = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员蒸饭");
SmallTool.sleepMillis(300);
return "米饭";
});
SmallTool.printTimeAndThread("小白在打王者");
String result = String.format("%s + %s 好了", cf1.join(), race.join());
SmallTool.printTimeAndThread("服务员打饭");
SmallTool.sleepMillis(100);
SmallTool.printTimeAndThread(String.format("%s ,小白开吃", result));
}
}
本文来自了blibli的灰灰视频,本次只是记录方便自己后面使用的时候方便实现
版权声明:本文为weixin_40593587原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。