并发并用程序是围绕任务进行管理的。所以设计时要指明一个清晰的任务边界。理想情况下,任务时独立的。
在正常负载下,服务器应用程序应该兼具良好的吞吐量和快速的响应性。应用程序在负荷过载时平缓地劣化,而不能负载一高就简单地以失败告终。所以要清晰任务边界并配合一个明确的任务执行策略。
大多数服务器选择了下面这个自然的任务边界:单独的用户请求。
class SingleThreadWebServer{
public static void main(String[] args)throws IOException{
ServerSocket socket=new ServerSocket(80);
while(true){
Socket connection=socket.accept();
handleRequest(connection);
}
}
}
上面的例子是顺序化的WebServer,理论上是正确的,一次只能处理一个请求,因此在生产上的执行效率很低。(在某些情况下,顺序化处理在简单性或安全性上具有优势,比如大多数的GUI框架使用单一线程顺序的处理任务。)
class ThreadPerTaskServer{
public static void main(String[] args)throws IOException{
ServerSocket socket=new ServerSocket(80);
while(true){
final Socket connection=socket.accept();
Runnable task=new Runnable(){
public void run(){
handleRequest(connection);
}
}
new Thread(task).start();
}
}
}
上面的例子有了良好的改进,但是存在一些实际的缺陷,尤其在需要创建大量的线程时会更加突出。
1,线程生命周期的开销。包括线程的创建和关闭。
2,资源消耗量。活动线程会消耗系统资源,尤其是内存,如果可运行的线程数多于可用的处理器数,线程将会空闲。会给垃圾回收器带来压力,而且大量线程竞争CPU资源时也会产生其他的性能开销。如果你有足够的线程保持所有的CPU忙碌,那么创建再多的线程百害而无一利。
3,稳定性。应该限制可以创建线程的数目,限制的数目依不同的平台而定,也收到JVM的启动参数、Thread的构造函数中请求的栈大小等因素的影响,以及底层OS线程的限制。如果打破这些限制,可能会收到一个OutOfMemorryError。企图从这种错误中恢复是非常危险的,简单的方法是避免超出这些限制。应该设置一个范围来限制你的应用程序可以创建的线程数,然后彻底地测试你的应用程序,确保即使到达了这个范围的极限,程序也不至于耗尽所有的资源。
在32位的机器上,主要的限制因素是线程栈的地址空间。每个线程都维护着两个执行栈,一个用于java代码,一个用于原生代码。典型的jvm默认会产生一个组合的栈,大小在半兆字节左右。如果你为每个线程分配了大小为232字节的栈,那么你的线程数量将被限制在几千到几万间不等。其他方面,比如OS的限制,可能产生更加严格的约束。
public static void main(String[] args) {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MINUTES.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println(i);
}
}
下面是我的测试结果,数目在2705前后。
2705
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
at Test11.main(Test11.java:23)
线程池为线程管理提供了帮助。作为Executor框架的一部分,juc提供了一个灵活的线程池实现。
在Java类库中,任务执行的首要抽象不是Thread,而是Executor。
public interface Executor{
void execute(Runable command);
}
使用线程池的WebServer
class TaskExecutionWebServer{
private static final int NTHREADS=100;
private static final Executor exec
=Executors.newFixedThreadPool(NTHREADS);
public static void main(Stirng[] args)throws IOException{
ServerSocket socket=new ServerSocket(80);
while(true){
final Socket connection=socket.accept();
Runnable task=new Runnable(){
public void run(){
handleRequest(connection);
}
};
exec.execute(task);
}
}
}
为每个任务启动一个新线程的Executor
public class ThreadPerTaskExecutor implements Executor{
public void execute(Runnable r){
new Thread(r).start();
}
}
将任务的提交与任务的执行体进行解耦,价值在于可以简单地为一个类给定的任务制定执行策略,并且保证后续的修改不会太困难。一个执行策略指明了如下几个因素:
1,任务在什么线程中执行。
2,任务以什么顺序执行(FIFO、LIFO、优先级)
3,可以有多少个任务并发执行。
4,可以有多少个任务进入等待执行队列。
5,如果系统过载,需要放弃一个任务,应该挑选哪一个任务。另外如何通知应用程序知道这一切。
6,在一个任务的执行前与结束后,应该做什么处理。
为了解决执行服务的生命周期问题,ExecutorService接口扩展了Executor,添加了一些用于生命周期管理的方法。
public interface ExecutorService extends Executor{
void shutdown();//平缓的关闭过程
List<Runnable> shutdownNow();//强制关闭的过程
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,TimeUnit unit)throws InterruptException
}
其暗示了生命周期有3中状态:运行、关闭和终止。
class LifecycleWebServer{
private final ExecutorService exec=...;
public void start()throws IOException{
ServerSocket socket=new ServerSocket(80);
while(!exec.isShutdown()){
try{
final Socket conn=socket.accept();
exec.execute(
new Runnable(){
public void run(){
handleRequest(conn);
}
}
);
}catch(RejectedExecutionException e){
if(!exec.isShutdown()){
log(“task submission rejected”,e);
}
}
}
}
public void stop(){
exec.shutdown();
}
void handleRequest(Socket connection){
Request req=readRequest(connection);
if(isShutdownRequest(req)){
stop();
}else{
dispatchRequest(req);
}
}
}
上面的例子是支持关闭的Web Server。
Timer工具管理任务的延迟执行,但是存在一些缺陷,你应该考虑使用ScheduledThreadPoolExecutor作为替代品。因为Timer用的是绝对时间而后者用的是相对时间。
Timer只创建唯一的线程来执行所有的Timer任务,如果一个Timer任务很耗时,会导致其他TimerTask的时效准确性出现问题。
Timer的另一个问题在于如果抛出未检查的异常,Timer将会产生无法预料的行为。Timer线程并不捕获异常,所以TimerTask抛出异常会终止timer线程。此时Timer也不会再回复线程的执行,会错误的认为整个Timer都被取消了,已经被安排但尚未执行的TimerTask永远不会再执行了,新的认为也不能被调度了,这个问题叫做线程泄漏。
public class OutOfTime{
public static void main(String[] args)throws Exception{
Timer timer=new Timer();
timer.schedule(new ThrowTask(),1);
TimeUnit.SECONDS.sleep(1);
timer.schedule(new ThrowTask(),1);
TimeUnit.SECONDS.sleep(5);
}
static class ThrowTask extends TimerTask{
public void run(){
throw new RuntimeException();
}
}
}
上面的例子中,程序1秒后终止了,还伴随着一个异常。
Exception in thread "Timer-0"java.lang.RuntimeException
at Test12$ThrowTask.run(Test12.java:16)
at java.util.TimerThread.mainLoop(Timer.java:512)
at java.util.TimerThread.run(Timer.java:462)
Exception in thread "main"java.lang.IllegalStateException: Timer already cancelled.
at java.util.Timer.sched(Timer.java:354)
at java.util.Timer.schedule(Timer.java:170)
at Test12.main(Test12.java:10)
顺序地渲染页面元素
public class SingleThreadRenderer{
void renderPage(CharSequence source){
renderText(source);
List<ImageData> imageData=new ArrayList<ImageData>();
for(ImageInfo imageInfo:scanForImageInfo(source)){
imageData.add(imageInfo.downloadImage());
}
for(ImageData data:imageData){
renderImage(data);
}
}
}
Future描述了任务的生命周期,任务的状态决定了get方法的行为。如果完成会立即返回或者抛出Exception,如果没有完成会阻塞直到完成。抛出的异常会被封装为ExecutionException然后重新抛出,如果认为被取消会抛出CancellationException。当抛出异常时可以使用getCause重新获得被封装的原始异常。
创建Future的方法有很多,ExecutorService中所有的submit都会返回一个Future,因此可以将一个Runnable或一个Callable提交给executor后得到一个Future。也可以为给定的Runnable或Callable实例化一个FutureTask。
为了加快渲染,我们将渲染过程分为两个部分,一个是渲染文本,一个是下载图像。
public class FutureRenderer{
private final ExecutorService executor=...;
void renderPage(CharSequence source){
final List<ImageInfo> imageInfos=scanForImageInfo(source);
Callable<List<ImageData>> task = new Callable<List<ImageData>>(){
public List<ImageData> call(){
List<ImageData> result=new ArrayList<ImageData>();
for(ImageInfo imageInfo:imageInfos){
result.add(imageInfo.downloadImage());
}
return result;
}
};
Future<List<ImageData>> future=executor.submint(task);
renderText(source);
try{
List<ImageData> imageData=future.get();
for(ImageData data:imageData){
renderImage(data);
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
future.cancel(true);
}catch(ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
上面的例子中,如果渲染文本的速度远远大于下载图像的速度,那么最终的性能与顺序执行版本的性能不会有很大的不同,反倒是代码的复杂度大大的提高了。
大量相互独立且同类的任务进行并发处理,会将程序的任务量分派到不同的任务中,这样才能真正获得性能的提升。