微服务优化之并行调用
互联网产品随着用户的增加,系统对服务的高性能、高可用、可伸缩、可扩展的支持,大都采用分布式RPC框架。然而随着业务的增加,系统越来越多,系统之间的调用也越来越复杂,原本一个系统中一次请求就可以完成的工作,现在可能被分散在多个系统中,一次请求需要多个系统响应。这样就会放大RPC调用延迟带来的副作用,影响系统的高性能需求。
例如:一个RPC接口中需要依赖另外三个系统的RPC服务,各RPC服务的响应时间分别是20ms、10ms、10ms,那么这个接口的对外系统依赖的耗时40ms。如果接口依赖越多,响应时间就会越长。

对此,需要在业务范围内进行性能优化,优化思路总的来说有两种:
第一:如果对RPC接口调用,不需要关心接口的返回值,那么可以采用异步RPC调用。
第二:如果依赖RPC接口返回值,并且连续调用的多个RPC之间没有依赖关系,可以采用并行化处理。
本文主要分享一下通过并行化处理,来优化RPC接口响应时间,如上例子中的RPC采用并行调用,对外系统接口的依赖耗时会降低到20ms。

第一:因为Java对线程的使用非常方便,所以完成并行调用对于Java语言来说是相对简单,根据依赖外部接口分别创建一个线程来调用就可以完成。

第二:那么问题来:如果我的接口在完成其他接口调用后,还需要完成额外的功能而且需要依赖其他接口调用结果,该怎么处理呢?Thread类通过join调用,可以让主线程等待子线程处理结果。

第三:那么问题又来了:子线程内部的异常无法在外部获取,而需要依赖外部接口的调用结果的情况下,如果RPC接口抛出异常,必须在主线程中获取并作出相应处理,这个工作可以通过FutureTask来完成。

第四:那么问题又来了:如果一个接口依赖十个外部系统,那么每次请求就需要创建十个线程,随着接口TPS增加,系统创建线程和销毁的线程耗费的资源越来越高,这个时候需要考虑采用线程池方案了。

第五:那么问题又来了:以上实例只是一个单应用的测试Demo,真实应用情况下如上这样在代码中创建线程池并没太大意义,应该创建全局的线程池,所有请求共用线程池才能达到线程资源共用。但是Executors中线程池都默认采用AbortPolicy 的拒绝策略,在高并发情况下,就会频繁出现的线程池拒绝服务异常。此时可以考虑自定义线程池,采用CallerRunsPolicy拒绝策略,在高并发量,当线程池无法提供服务的情况下,采用主线程自己创建线程,达到并发量和计算资源的最优协调。
第六:完成以上操作就可以完美了吗?然而情况并非如此,如果细心测试发现,如果其中一个接口抛出异常时,主线程就结束了,而其他还没有执行结束的子线程将继续执行,一开始我们通过Thread.join()来协调主子线程的先后顺序,而现在采用线程池,无法在获取线程并且调用join方法,而是采用FutureTask.get()来协调先后顺序,那么还可以采用哪些方式保证主线程最后结束呢?此时可以采用一些特有的并发工具,如:闭锁,栅栏,信号量。如下为网络摘抄的三个工具对比:
闭锁(CountDownLatch) | 类似于门。门初始是关闭的,试图进门的线程挂起等待开门。当负责开门进程将门打开后,所有等待线程被唤醒。 门一旦打开就不能再关闭了。 | CountDownLatch(int n):指定闭锁计数器 await() :挂起等待闭锁计数器为0 countDown():闭锁计数器减1 |
栅栏(CyclicBarrier) | 和闭锁有类似之处。闭锁是等待“开门”事件;栅栏是等待其他线程。例如有N个线程视图通过栅栏,此时先到的要等待,直到所有线程到到达后,栅栏开启,所有等待线程被唤醒通过栅栏。 | CyclicBarrier(int n):需要等待的线程数量 await():挂起等待达到线程数量 |
信号量(Semaphore) | 和锁的作用类似。区别是锁只允许被一个线程获取,但是信号量可以设置资源数量。当没有可用资源时,才被挂起等待。 | Semaphore(int n):指定初始的资源数量 acquire():试图获取资源。当没有可用资源时挂起 release():释放一个资源 |
本文采用栅栏完成实例代码如下:
packagecom.halfworlders.test.domo;
importjava.util.concurrent.Callable;
importjava.util.concurrent.CyclicBarrier;
importjava.util.concurrent.ExecutionException;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.FutureTask;
importjava.util.concurrent.LinkedBlockingQueue;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
importcom.halfworlders.test.exp.AppException;
importcom.halfworlders.test.impl.ServiceImpl;
importcom.halfworlders.test.intf.ServiceInterface;
publicclassApp {
/**
* 外接口总数
*/
privatestaticfinalintINTERFACE_COUNT= 10;
ExecutorService executorService=newThreadPoolExecutor(INTERFACE_COUNT,INTERFACE_COUNT*3, 10L,
TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>(),newThreadPoolExecutor.CallerRunsPolicy());
publicstaticvoidmain(String[]args) {
longstart= System.currentTimeMillis();
App test=newApp();
test.test();
longend= System.currentTimeMillis();
System.out.println("总耗时:"+(end-start)+"ms");
}
@SuppressWarnings("unchecked")
publicvoidtest() {
finalCyclicBarriercb=newCyclicBarrier(INTERFACE_COUNT+ 1);
finalServiceInterface[]services= assembles();
finalFutureTask<Integer>[]futureTasks=newFutureTask[INTERFACE_COUNT];
for(inti= 0;i<INTERFACE_COUNT;i++) {
finalIntegerfi=i;
futureTasks[i] =newFutureTask<Integer>(newCallable<Integer>() {
@Override
publicInteger call()throwsException {
try{
returnservices[fi].service();
} finally{
cb.await();
}
}
});
executorService.submit(futureTasks[i]);
}
String serviceName=null;
try{
//打开栅栏
cb.await();
//如果有其他系统调用异常,则将该异常向外层抛出
for(inti= 0;i<INTERFACE_COUNT;i++) {
serviceName=services[i].getName();
futureTasks[i].get();
}
} catch(Exceptione) {
if((einstanceofExecutionException) && (e.getCause()instanceofAppException)) {
throw(AppException)e.getCause();
} else{
thrownewRuntimeException(serviceName+"系统异常",e);
}
}
}
privateServiceInterface[] assembles(){
ServiceInterface[] service=newServiceInterface[INTERFACE_COUNT];
for(inti= 0;i<INTERFACE_COUNT;i++) {
service[i] =newServiceImpl("接口"+i);
}
returnservice;
}
}