Java并发(七)异步执行任务之手写线程池、定时任务、CompletableFuture

手写

表示子任务:

public interface Callable<V> {
    V call() throws Exception;
}

表示异步调用的结果:

public interface MyFuture <V> {
    V get() throws Exception ;//返回真正的结果,如果结果还没有计算完成,get会阻塞直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常
}

类MyExecutor:

public class MyExecutor{
    //这个子线程执行实际的子任务,记录执行结果到result变量、异常到exception变量;
    //执行结束后设置共享状态变量done为true,并调用notifyAll以唤醒可能在等待结果的主线程
    static class ExecuteThread<V> extends Thread {
        private V result = null;
        private Exception exception = null;
        private boolean done = false;
        private Callable<V> task;
        private Object lock;

        public ExecuteThread(Callable<V> task, Object lock) {
            this.task = task;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                result = task.call();
            } catch (Exception e) {
                exception = e;
            } finally {
                synchronized (lock) {
                    done = true;
                    lock.notifyAll();
                }
            }
        }

        public V getResult() {
            return result;
        }

        public boolean isDone() {
            return done;
        }

        public Exception getException() {
            return exception;
        }
    }
    //目的:执行子任务并返回异步结果
    //内容:封装了创建并启动子线程,同步获取结果的过程
    //启动一个线程,并返回MyFuture对象,MyFuture的get方法会阻塞等待直到线程运行结束
    public <V> MyFuture<V> execute(final Callable<V> task) {
        final Object lock = new Object();
        final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
        thread.start();

        MyFuture<V> future = new MyFuture<V>() {
            @Override
            public V get() throws Exception {
                synchronized (lock) {
                    while (!thread.isDone()) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                    if (thread.getException() != null) {
                        throw thread.getException();
                    }
                    return thread.getResult();
                }
            }
        };
        return future;
    }
}

使用:

public static void main(String[] args) {
    MyExecutor executor = new MyExecutor();
    // 子任务
    Callable<Integer> subTask = new Callable<Integer>() {

        @Override
        public Integer call() throws Exception {
            // ... 执行异步任务
            int millis = (int) (Math.random() * 1000);
            Thread.sleep(millis);
            return millis;
        }
    };
    // 异步调用,返回一个MyFuture对象
    MyFuture<Integer> future = executor.execute(subTask);
    // ... 执行其他操作
    try {
        // 获取异步调用的结果
        Integer result = future.get();
        System.out.println(result);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

使用线程池,要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理:

try {
	// 业务逻辑
} catch (RuntimeException x) {
	// 按需处理
} catch (Throwable x) {
	// 按需处理
}

标准异常处理建议使用ThreadFactory:https://www.jianshu.com/p/5a79564101e8

定时任务

异步任务中,常见的任务就是定时任务

有两种方式实现定时任务:

  • 使用java.util包中的Timer和TimerTask
  • 使用Java并发包中的ScheduledExecutorService

1 Timer和TimerTask

1.1 TimerTask

TimerTask表示一个定时任务,它是一个抽象类,实现了Runnable,具体的定时任务需要继承该类,实现run方法

1.2 Timer

Timer是一个具体类,它负责定时任务的调度和执行

一个Timer对象按顺序运行schedule()的任务,一次只能运行一个,不能并发运行

有如下主要方法:

//在指定绝对时间time运行任务task
public void schedule(TimerTask task, Date time)
//在当前时间延时delay毫秒后运行任务task
public void schedule(TimerTask task, long delay)
//固定延时重复执行,第一次计划执行时间为firstTime,后一次的计划执行时间为前一次"实际"执行时间加上period
public void schedule(TimerTask task, Date firstTime, long period)
//同样是固定延时重复执行,第一次执行时间为当前时间加上delay
public void schedule(TimerTask task, long delay, long period)
//固定频率重复执行,第一次计划执行时间为firstTime,后一次的计划执行时间为前一次"计划"执行时间加上period
public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period)
//同样是固定频率重复执行,第一次计划执行时间为当前时间加上delay
public void scheduleAtFixedRate(TimerTask task, long delay, long period)
  • 固定延时(fixed-delay)与固定频率(fixed-rate)

同:都是重复执行

不同:

后一次任务执行相对的时间是不一样的:

对于固定延时,它是基于上次任务的"实际"执行时间来算的,如果由于某种原因,上次任务延时了,则本次任务也会延时;

对于固定频率,会尽量补够运行次数:会从firstTime开始算,有可能加上period后还是一个过去时间,从而连续运行很多次,直到时间超过当前时间

例子:

//第一个运行一次,第二个重复执行,1秒一次,第一个先运行。
//运行该程序,会发现,第二个任务只有在第一个任务运行结束后才会开始运行,运行后1秒一次
timer.schedule(new xxxTask(), 10);
timer.schedule(new yyyTask(), 100, 1000);
//运行该程序,第二个任务同样只有在第一个任务运行结束后才会运行,但它会把之前没有运行的次数补过来,一次性运行的次数为xxxTask()消耗的时间所运行的次数,假设消耗5秒,因为此处1秒运行1次,所以会一次性运行5次
timer.schedule(new xxxTask(), 10);
timer.scheduleAtFixedRate(new yyyTask(), 100, 1000);

1.2.1 基本原理

  • 内部

主要由两部分组成:任务队列和Timer线程。

private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);

任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级;
Timer线程负责执行所有的定时任务,需要强调的是,一个Timer对象只有一个Timer线程

  • 协作

Timer线程主体是一个循环,从队列中拿任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它;
如果队列中没有任务或第一个任务延时还没到,就睡眠;
如果睡眠过程中队列上添加了新任务且新任务是第一个任务,Timer线程会被唤醒,重新进行检查

  • 计划的时间

下次任务的计划是在执行当前任务之前就做出了的(schedule不同task);

对于固定延时的任务,延时相对的是任务执行前的当前时间,而不是任务执行后;
与ScheduledExecutorService的固定延时计算方法是不同的,后者的计算方法更合乎一般的期望(它是从任务执行后开始算的,假设第一次为initialDelay后,则第二次为第一次任务执行结束后再加上delay)。

对于固定频率的任务,它总是基于最先的计划计划的,所以,很有可能会出现一下子执行很多次任务的情况

  • 陷阱

在执行任何一个任务的run方法时,一旦run抛出异常,Timer线程就会退出,从而所有定时任务都会被取消
如果希望各个定时任务不互相干扰,一定要在run方法内捕获所有异常

一个Timer对象只有一个Timer线程,这意味着,定时任务不能耗时太长,更不能是无限循环,否则其后的定时任务永远无法执行

2 ScheduledExecutorService

出现的原因:用来解决Timer/TimerTask的陷阱:可以多线程、固定延时更加符合逻辑、不会被异常影响到其他异常任务

接口:

public interface ScheduledExecutorService extends ExecutorService {
    //单次执行,在指定延时delay后运行command
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    //单次执行,在指定延时delay后运行callable
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    //固定频率重复执行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    //固定延时重复执行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

返回值都是ScheduledFuture:

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

主要实现类:
ScheduledThreadPoolExecutor,它是线程池ThreadPoolExecutor的子类,是基于线程池实现的;
它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使corePoolSize设为0,它也会至少运行一个线程
工厂类Executors提供了一些创建此类的方法:

//单线程的定时任务执行服务
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
//多线程的定时任务执行服务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

2.1 基本原理

任务执行流程:
在这里插入图片描述
获取任务ScheduledFutureTask:
在这里插入图片描述
添加任务:
在这里插入图片描述
ScheduledThreadPoolExecutor的实现思路与Timer基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,它的主要不同是:

  • 它的背后是线程池,可以有多个线程执行任务
  • 它在任务执行后再设置下次执行的时间,对于固定延时的任务更为合理
  • 任务执行线程会捕获任务执行过程中的所有异常,一个定时任务的异常不会影响其他定时任务,但发生异常的任务也不再被重新调度,即使它是一个重复任务

实践中建议使用ScheduledExecutorService

共同局限:
不太胜任复杂的定时任务调度,比如,每周一和周三晚上18:00到22:00,每半小时执行一次。
对于类似这种需求,可以利用日期和时间处理方法(Joda-Time),或者利用更为强大的第三方类库,比如Quartz

CompletableFutur

用来支持异步编程

主要是四个方法:

//使⽤默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync(Runnable runnable)和supplyAsync(Supplier< U >supplier)的区别是:Runnable 接口的run()方法没有返回值,而Supplier接口的get()方法是有返回值的。

默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数);
如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。
所以强烈建议要根据不同的业务类型创建不同的线程池,以避免互相干扰。

实现了CompletionStage接口:

CompletionStage接口可以清晰地描述任务之间的时序关系:串行、并行、汇聚


版权声明:本文为qq_41594698原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。