手写
表示子任务:
public interface Callable<V> {
V call() throws Exception;
}
表示异步调用的结果:
public interface MyFuture <V> {
V get() throws Exception ;//返回真正的结果,如果结果还没有计算完成,get会阻塞直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常
}
类MyExecutor:
public class MyExecutor{
//这个子线程执行实际的子任务,记录执行结果到result变量、异常到exception变量;
//执行结束后设置共享状态变量done为true,并调用notifyAll以唤醒可能在等待结果的主线程
static class ExecuteThread<V> extends Thread {
private V result = null;
private Exception exception = null;
private boolean done = false;
private Callable<V> task;
private Object lock;
public ExecuteThread(Callable<V> task, Object lock) {
this.task = task;
this.lock = lock;
}
@Override
public void run() {
try {
result = task.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
}
public V getResult() {
return result;
}
public boolean isDone() {
return done;
}
public Exception getException() {
return exception;
}
}
//目的:执行子任务并返回异步结果
//内容:封装了创建并启动子线程,同步获取结果的过程
//启动一个线程,并返回MyFuture对象,MyFuture的get方法会阻塞等待直到线程运行结束
public <V> MyFuture<V> execute(final Callable<V> task) {
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
thread.start();
MyFuture<V> future = new MyFuture<V>() {
@Override
public V get() throws Exception {
synchronized (lock) {
while (!thread.isDone()) {
try {
lock.wait();
} catch (InterruptedException e) {
}
}
if (thread.getException() != null) {
throw thread.getException();
}
return thread.getResult();
}
}
};
return future;
}
}
使用:
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
// 子任务
Callable<Integer> subTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// ... 执行异步任务
int millis = (int) (Math.random() * 1000);
Thread.sleep(millis);
return millis;
}
};
// 异步调用,返回一个MyFuture对象
MyFuture<Integer> future = executor.execute(subTask);
// ... 执行其他操作
try {
// 获取异步调用的结果
Integer result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
使用线程池,要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理:
try {
// 业务逻辑
} catch (RuntimeException x) {
// 按需处理
} catch (Throwable x) {
// 按需处理
}
标准异常处理建议使用ThreadFactory:https://www.jianshu.com/p/5a79564101e8
定时任务
异步任务中,常见的任务就是定时任务
有两种方式实现定时任务:
- 使用java.util包中的Timer和TimerTask
- 使用Java并发包中的ScheduledExecutorService
1 Timer和TimerTask
1.1 TimerTask
TimerTask表示一个定时任务,它是一个抽象类,实现了Runnable,具体的定时任务需要继承该类,实现run方法
1.2 Timer
Timer是一个具体类,它负责定时任务的调度和执行
一个Timer对象按顺序运行schedule()的任务,一次只能运行一个,不能并发运行
有如下主要方法:
//在指定绝对时间time运行任务task
public void schedule(TimerTask task, Date time)
//在当前时间延时delay毫秒后运行任务task
public void schedule(TimerTask task, long delay)
//固定延时重复执行,第一次计划执行时间为firstTime,后一次的计划执行时间为前一次"实际"执行时间加上period
public void schedule(TimerTask task, Date firstTime, long period)
//同样是固定延时重复执行,第一次执行时间为当前时间加上delay
public void schedule(TimerTask task, long delay, long period)
//固定频率重复执行,第一次计划执行时间为firstTime,后一次的计划执行时间为前一次"计划"执行时间加上period
public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period)
//同样是固定频率重复执行,第一次计划执行时间为当前时间加上delay
public void scheduleAtFixedRate(TimerTask task, long delay, long period)
- 固定延时(fixed-delay)与固定频率(fixed-rate)
同:都是重复执行
不同:
后一次任务执行相对的时间是不一样的:
对于固定延时,它是基于上次任务的"实际"执行时间来算的,如果由于某种原因,上次任务延时了,则本次任务也会延时;
对于固定频率,会尽量补够运行次数:会从firstTime开始算,有可能加上period后还是一个过去时间,从而连续运行很多次,直到时间超过当前时间
例子:
//第一个运行一次,第二个重复执行,1秒一次,第一个先运行。
//运行该程序,会发现,第二个任务只有在第一个任务运行结束后才会开始运行,运行后1秒一次
timer.schedule(new xxxTask(), 10);
timer.schedule(new yyyTask(), 100, 1000);
//运行该程序,第二个任务同样只有在第一个任务运行结束后才会运行,但它会把之前没有运行的次数补过来,一次性运行的次数为xxxTask()消耗的时间所运行的次数,假设消耗5秒,因为此处1秒运行1次,所以会一次性运行5次
timer.schedule(new xxxTask(), 10);
timer.scheduleAtFixedRate(new yyyTask(), 100, 1000);
1.2.1 基本原理
- 内部
主要由两部分组成:任务队列和Timer线程。
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);
任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级;
Timer线程负责执行所有的定时任务,需要强调的是,一个Timer对象只有一个Timer线程
- 协作
Timer线程主体是一个循环,从队列中拿任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它;
如果队列中没有任务或第一个任务延时还没到,就睡眠;
如果睡眠过程中队列上添加了新任务且新任务是第一个任务,Timer线程会被唤醒,重新进行检查
- 计划的时间
下次任务的计划是在执行当前任务之前就做出了的(schedule不同task);
对于固定延时的任务,延时相对的是任务执行前的当前时间,而不是任务执行后;
与ScheduledExecutorService的固定延时计算方法是不同的,后者的计算方法更合乎一般的期望(它是从任务执行后开始算的,假设第一次为initialDelay后,则第二次为第一次任务执行结束后再加上delay)。
对于固定频率的任务,它总是基于最先的计划计划的,所以,很有可能会出现一下子执行很多次任务的情况
- 陷阱
在执行任何一个任务的run方法时,一旦run抛出异常,Timer线程就会退出,从而所有定时任务都会被取消
如果希望各个定时任务不互相干扰,一定要在run方法内捕获所有异常
一个Timer对象只有一个Timer线程,这意味着,定时任务不能耗时太长,更不能是无限循环,否则其后的定时任务永远无法执行
2 ScheduledExecutorService
出现的原因:用来解决Timer/TimerTask的陷阱:可以多线程、固定延时更加符合逻辑、不会被异常影响到其他异常任务
接口:
public interface ScheduledExecutorService extends ExecutorService {
//单次执行,在指定延时delay后运行command
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//单次执行,在指定延时delay后运行callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//固定频率重复执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//固定延时重复执行
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
返回值都是ScheduledFuture:
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
主要实现类:
ScheduledThreadPoolExecutor,它是线程池ThreadPoolExecutor的子类,是基于线程池实现的;
它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使corePoolSize设为0,它也会至少运行一个线程
工厂类Executors提供了一些创建此类的方法:
//单线程的定时任务执行服务
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
//多线程的定时任务执行服务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
2.1 基本原理
任务执行流程:
获取任务ScheduledFutureTask:
添加任务:
ScheduledThreadPoolExecutor的实现思路与Timer基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,它的主要不同是:
- 它的背后是线程池,可以有多个线程执行任务
- 它在任务执行后再设置下次执行的时间,对于固定延时的任务更为合理
- 任务执行线程会捕获任务执行过程中的所有异常,一个定时任务的异常不会影响其他定时任务,但发生异常的任务也不再被重新调度,即使它是一个重复任务
实践中建议使用ScheduledExecutorService
共同局限:
不太胜任复杂的定时任务调度,比如,每周一和周三晚上18:00到22:00,每半小时执行一次。
对于类似这种需求,可以利用日期和时间处理方法(Joda-Time),或者利用更为强大的第三方类库,比如Quartz
CompletableFutur
用来支持异步编程
主要是四个方法:
//使⽤默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync(Runnable runnable)和supplyAsync(Supplier< U >supplier)的区别是:Runnable 接口的run()方法没有返回值,而Supplier接口的get()方法是有返回值的。
默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数);
如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。
所以强烈建议要根据不同的业务类型创建不同的线程池,以避免互相干扰。
实现了CompletionStage接口:
CompletionStage接口可以清晰地描述任务之间的时序关系:串行、并行、汇聚