异步、线程池(ExecutorService、ThreadPoolExecutor、CompletableFuture)


1、概述:

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。

他的主要特点为:线程复用; 控制最大并发数; 管理线程。

第一: 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二: 提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
第三: 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降抵系统的稳定性,使用线程池可以逆行统一的分配,调优和监控


2、初始化线程的四种方式:

  • 1、继承Thread
  • 2、实现Runnable
  • 3、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
  • 4、线程池

优缺点:

  • 1、2无法获取返回值,3可以获取返回值(阻塞式等待)
  • 1、2、3都不能控制资源,
  • 4可以控制资源,性能稳定

代码示例:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class CreateThreadTest {
    public static void main(String[] args) {
        // 1、继承Thread
        new Thread(new Thread01()).start();

        // 2、实现Runnable
        new Thread(new Rannable01()).start();

        // 3、实现Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
        FutureTask<String> stringFutureTask = new FutureTask<>(new Callable01());
        new Thread(stringFutureTask).start();
        try {
            log.info("结果:{}",stringFutureTask.get());//阻塞式等待
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        // 4、线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.submit(()->log.info("方式4..."));//submit可以获取结果,execute无法获取结果
        executorService.shutdown();//关闭
    }
}
@Slf4j
class Thread01 extends Thread{
    @Override
    public void run() {
        log.info("方式1...");
    }
}
@Slf4j
class Rannable01 implements Runnable{

    @Override
    public void run() {
        log.info("方式2...");
    }
}
@Slf4j
class Callable01 implements Callable<String>{

    @Override
    public String call() throws Exception {
        log.info("方式3...");
        return "Callable01";
    }
}

在这里插入图片描述


3、七大参数:

3.1、七大参数详解

	public ThreadPoolExecutor(
						int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler){
                        
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
            
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
            
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • int corePoolSize,核心线程数【只要线程池不销毁(设置allowCoreThreadTimeOut属性),就一直存在】;线程池创建好之后准备就绪的线程数,等待接收异步亲求去执行;
  • int maximumPoolSize,最大线程数,可用于控制资源
  • long keepAliveTime,存活时间,当前线程数量大于指定的核心线程数量,只要存活时间达到该值,就会被释放。(maximumPoolSize-corePoolSize)
  • TimeUnit unit,时间单位
  • BlockingQueue<Runnable> workQueue,阻塞队列,如果任务很多,多余的任务会被放在队列中
  • ThreadFactory threadFactory,创建线程的工厂
  • RejectedExecutionHandler handler 如果队列满了,按照我们指定的拒绝策略执行任务

如果是要五大参数,就是前五个,后两个默认:

  • 例如:使用五个参数的构造方法:
    在这里插入图片描述
    其中默认的拒绝策略:(丢弃并抛异常)
    在这里插入图片描述

3.2、如何合理设置核心线程数:

  • CPU密集型
    在这里插入图片描述

  • IO密集型

    • 方式1:
      在这里插入图片描述

    • 方式2:
      在这里插入图片描述


4、工作顺序:

在这里插入图片描述

工作顺序【1核心线程 -> 2阻塞队列 -> 3最大线程 -> 4拒绝策略(超过最大线程)、释放线程(超过空闲时间)】

  • 1、线程池创建,准备好指定数量(corePoolSize)的核心线程,以接收任务
  • 2、核心线程满了,再进入的线程就会被放进阻塞队列(workQueue)中,当有空闲核心线程,就会去阻塞队列中获取任务
  • 3、阻塞队列满了,就直接开新的线程执行,最大只能开到maximumPoolSize指定的数量
  • 4、达到maximumPoolSize个线程,就用拒绝策略拒绝任务
  • 5、如果当前线程数大于指定核心线程数(corePoolSize),多出的每一个线程在空闲指定时间(keepAliveTime)过后,释放线程

依据:从execute方法可以看出来:
在这里插入图片描述在这里插入图片描述


5、Executors常用创建线程池方法:

前三种更为常用,都是使用ThreadPoolExecutor创建的线程池;第四种采用ScheduledThreadPoolExecutor创建线程池;第5种采用ForkJoinPool创建线程池;他们的关系如下图:
在这里插入图片描述

  • 1、Executors.newCachedThreadPool();core是0,所有都可回收;适合执行短期异步的小程序或者负载较轻的服务器
    在这里插入图片描述

  • 2、Executors.newFixedThreadPool();固定大小,core=max;适合执行长期的任务
    在这里插入图片描述

  • 3、Executors.newSingleThreadExecutor();单线程的线程池,后台从队列里面获取任务,挨个执行;适合一个任务一个任务执行的场景
    在这里插入图片描述

  • 4、Executors.newScheduledThreadPool();定时任务的线程池
    在这里插入图片描述

  • 5、Executors.newWorkStealingPool();ForkJoinPool 分支合并
    在这里插入图片描述

一般不使用上述方法创建线程池,前两个阻塞队列的大小是Integer.MAX_VALUE,后两个的最大线程数是Integer.MAX_VALUE。如,阿里巴巴Java开发手册中这样描述:
在这里插入图片描述


6、四种拒绝策略:

是什么?

  • 等待队列也已经排满了,再也塞不下新任务了同时,
  • 线程池中的max线程也达到了,无法继续为新任务服务。
  • 这时候我们就需要拒绝策略机制合理的处理这个问题。

先看接口类RejectedExecutionHandler:只有一个rejectedExecution方法,来决定如何拒绝
在这里插入图片描述

它有以下四个实现类:

  • DiscardOldestPolicy: poll,移除第一个;删除工作队列 最早的一个,再尝试运行当前的r(依照工作顺序运行,即再检验一遍是否小于核心线程数、队列是否满了、是否达到最大线程数)。
    在这里插入图片描述

  • AbortPolicy: 很明显,直接抛异常
    在这里插入图片描述

  • CallerRunsPolicy: 直接调用run方法,哪来的就在哪运行
    在这里插入图片描述

  • DiscardPolicy:这个有意思,啥都没干,相当于直接忽略
    在这里插入图片描述

代码演示:

  • 使用下面的代码,分别测试四种拒绝策略:
  • 核心2,最大5,阻塞3,10个任务(0~9)
    • 前两个:0、1直接运行
    • 2、3、4进入阻塞队列
    • 5、6、7新开线程
    • 8、9走拒绝策略
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

log.info(rejectedExecutionHandler.getClass().getName());

// 核心2,最大5,阻塞队列3;最多同时执行 5 + 3 = 8 个任务
ExecutorService executorService = new ThreadPoolExecutor(2,5,
        30, TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),
        Executors.defaultThreadFactory(),
        rejectedExecutionHandler);

try {
	//运行10个任务,会有多出的任务
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        executorService.execute(() -> {
            log.info("执行任务{}...", finalI);
        });
    }
} finally {
    executorService.shutdown();
}
  • 抛异常:
    在这里插入图片描述

  • 直接忽略组后来的:
    在这里插入图片描述

  • 丢弃队列最早的:
    在这里插入图片描述

  • 哪来的在哪运行:最后来的两个在main中运行:
    在这里插入图片描述


7、CompletableFuture启动异步任务

  • runAsync:无返回值
  • supplyAsync:有返回值
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CompletableFutureTest {
    private static ExecutorService executor = Executors.newFixedThreadPool(2);
    public static void main(String[] args) {
        //方式1、无返回值
        CompletableFuture.runAsync(()->log.info("runAsync"),executor);

        // 方式2、有返回值
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            log.info("supplyAsync");
            return "supplyAsync";
        }, executor);
        //获取返回值
        try {
            String res = supplyAsync.get();
            log.info("返回值:{}",res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

8、方法执行完成后的感知

CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
        () -> {
            log.info("创建...");
            return 10 / 0;
        }, executor)
        //处理完成后用当前线程处理,如果为whenCompleteAsync,则用其他线程处理
        .whenComplete((res, e) -> {
            log.info("结果:{},异常:{}", res, e);
        })
        //出现异常后的回调
        .exceptionally((throwable) -> {
            log.info("出现异常,返回0,异常:{}", throwable.getCause().toString());
            return 0;
        });

//打印结果
try {
    log.info("结果:{}",supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这里插入图片描述


9、方法执行完成后的处理

CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(
        () -> {
            log.info("创建...");
            return 10 / 2;
            // return 10 / 0;
        }, executor)
        //参数:BiFunction<? super T, Throwable, ? extends U> fn;接收结果、异常,返回新结果
        .handle((res, e) -> {
            if (res != null) {
                log.info("结果不为空");
                return res * 2;
            }

            if (e != null) {
                log.info("有异常");
                return 0;
            }

            log.info("其他情况");
            return 0;
        });

//打印结果
try {
    log.info("结果:{}", supplyAsync.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这里插入图片描述


10、线程池串行方法

基本方法:

  • thenApply:接收上一个返回结果,返回新结果
  • thenAccept:只能接收上一个返回结果,无返回值
  • thenRun:不接受返回结果也不返回结果
    在这里插入图片描述

现实现一个需求:

  • 先启动一个线程,任务1
  • 任务2获取任务1的返回结果,并返回新结果
  • 任务3在同一个线程上获取任务2的结果,但不返回任何结果
  • 任务4既不接收上一个返回结果,也不返回结果

代码:

    private static void 线程池串行方法() {
        CompletableFuture.supplyAsync(
                () -> {
                    log.info("任务1...");
                    return 111;
                })

                //获取结果,返回新结果(类型可不同)
                .thenApplyAsync((res) -> {
                    log.info("任务2...获取到上一个的结果:{}", res);
                    return "222";
                }, executor)

                //获取上一个结果,但不返回结果
                .thenAccept((res) -> {
                    log.info("任务3...获取到上一个的结果:{}", res);
                })

                //既不获取,也不返回
                .thenRunAsync(() -> {
                    log.info("任务4...无法获取到上一个的结果");
                }, executor);

        // 在测试包下运行,线程睡眠,防止提前结束
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

在这里插入图片描述


11、两任务组合

11.1、都要完成

  • thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回结果
  • thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
  • runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后处理任务
  • 以上方法后面加上Async,表示新开一个线程,例如:thenCombineAsync

示例:使用第三个任务,获取前两个任务的返回结果,并进行拼串,返回最终结果:

//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
        () -> {
            log.info("任务1...");
            return 111;
        }, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
        () -> {
            log.info("任务2...");
            return 222;
        }, executor);

//任务3在前两个执行完后执行:
CompletableFuture<String> cf3 = cf1.thenCombineAsync(cf2, (f1, f2) -> {
    log.info("任务3...获取到的结果:{},{}", f1, f2);
    return f1 + "" + f2;
}, executor);

//打印任务3的结果
try {
    log.info("任务3的返回值:{}",cf3.get());
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

在这里插入图片描述

11.2、一个完成即可

  • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并由新返回值
  • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,但无返回值
  • runAfterEither:两个任务有一个执行完成,不获取它的返回值,处理任务,也没有返回值
  • 均可加Async
  • 组合的两个任务需要有相同的返回值

以下测试三种方式:

//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
        () -> {
            log.info("任务1...");
            return 111;
        }, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
        () -> {
            log.info("任务2...开始");
            try {
                Thread.sleep(2000);
                log.info("任务2...结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 222;
        }, executor);

//=========方式1=========
//任务3在前两个执行完后执行:
CompletableFuture<Integer> cf3 = cf1.applyToEitherAsync(cf2, (f1) -> {
    log.info("任务3-1...获取到的结果:{},并把结果扩大2倍", f1);
    return f1 * 2;
}, executor);
//打印cf3的结果
try {
    log.info("任务3-1的返回值:{}", cf3.get());
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

//=========方式2=========
cf1.acceptEitherAsync(cf2, (f1) -> {
    log.info("任务3-2获取到的结果:{}", f1);
}, executor);

//=========方式3=========
cf1.runAfterEitherAsync(cf2,
        () -> log.info("任务3-3:前两个任务有一个完成了"),
        executor);

在这里插入图片描述


12、多任务组合

CompletableFuture中有两个静态方法:

  • allOf:所有任务都完成
  • anyOf:一个任务完成即可
    在这里插入图片描述

代码示例:

//两个任务
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
        () -> {
            log.info("任务1...");
            return 111;
        }, executor);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
        () -> {
            log.info("任务2...开始");
            try {
                Thread.sleep(2000);
                log.info("任务2...结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 222;
        }, executor);

//一个完成即可:
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(cf1, cf2);
try {
    log.info("有一个任务完成了,返回值:{}",anyOf.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

//都要完成:
CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2);
try {
    log.info("任务都完成了,返回值:{},{}",cf1.get(),cf2.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

在这里插入图片描述


版权声明:本文为m0_55155505原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。