一、处理请求时序图(以tomcat7 nio为列)

线程池处理SocketProcessor中请求过程时序图
二、源码剖析
1、NioEndpointer启动时(启动流程上一篇有时序图)会开启Acceptor、Poller线程
默认Acceptor数量是1 ,Poller数量如果是多核就是两个线程,如果单核就是一个线程
当然都可以配置如下: acceptorThreadCount、pollerThreadCount配置
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" acceptorThreadCount="1" pollerThreadCount="5"
connectionTimeout="20000"
redirectPort="8443" />

2、进入Acceptor线程中
run()方法中主要就是接受socket请求、
删减多余代码
public void run() {
while (running) {//一直运行
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
socket = serverSock.accept(); //
//处理socket
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
}
进入setSocketOptions()中,主要作用就是把socket封装成NioChannel注册到poller队列中
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
// 从该channel上读取数据不阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
// 每接收到一个socket连接就获取一个NioChannel来封装这个socket,NioChannel是可重用的对象
NioChannel channel = nioChannels.poll(); // 拿出对头的NioChannel
/*
此处省去多余代码,就是创建NioChannel封装socket,从缓存中获取NioChannel没有就创建一个
*/
// 每接收到一个新socket连接,就会生成一个
getPoller0().register(channel);
} catch (Throwable t) {
}
return true;
}
3、Poller线程
进入Poller的register()方法,创建PollerEvent加入到poller队列中,此时并没有真正注册到select上
public void register(final NioChannel socket) {
socket.setPoller(this);
// 获取一个KeyAttachment对象,将当前socket的相关信息设置进去
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
ka.reset(this,socket,getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
// 获取一个PollerEvent对象,本事件为一个注册事件,对读事件感兴趣(这里暂时还没有真正的向select去注册事件)
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 把PollerEvent添加到事件列表中去
addEvent(r);
}
在进入poller线程run方法,tomcat启动时这个线程就已经在运行了,主要作用就是,调用events()处理队列中的pollerevnt,真正把socket注册到select上、然后去select上获取就绪事件调用processKey处理事件
public void run() {
// Loop until destroy() is called
while (true) {
try {
// Loop if endpoint is paused
while (paused && (!close) ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
}
boolean hasEvents = false;
// Time to terminate?
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
} else {
// 执行PollerEvent事件,向Selector注册读写事件
hasEvents = events(); // 真正的向selector注册
}
try {
if ( !close ) {
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
// 上面的events()会去注册事件,而这里是去查询是否有事件就绪
// 不阻塞
keyCount = selector.selectNow();
} else {
// 阻塞,超时会继续执行下面的代码,不会报错
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch ( NullPointerException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch ( CancelledKeyException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
// 如果存在就绪事件,那么则遍历并处理事件
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
// 循环处理当前就绪的事件
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
// 处理事件
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}//while
stopLatch.countDown();
}
进入 processKey(sk, attachment);主要是处理请求、所以会进入processSocket()
protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;
try {
if ( close ) {
cancelledKey(sk, SocketStatus.STOP, attachment.comet);
} else if ( sk.isValid() && attachment != null ) {
attachment.access();//make sure we don't time out valid sockets
sk.attach(attachment);//cant remember why this is here
// 当前就绪事件对应的channel
NioChannel channel = attachment.getChannel();
// 读就绪或写就绪
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
if ( isWorkerAvailable() ) {
unreg(sk, attachment, sk.readyOps()); //
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 从channel中读取数据
if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
closeSocket = true;
}
}
// 读完数据之后可能就要写数据
if (!closeSocket && sk.isWritable()) {
// 将数据写入到channel中
if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk,SocketStatus.DISCONNECT,false);
}
} else {
result = false;
}
}
}
} else {
//invalid key
cancelledKey(sk, SocketStatus.ERROR,false);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk, SocketStatus.ERROR,false);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
return result;
}
再次进入processSocket()方法中,可以看到会生成SocketProcessor交给线程池处理、下面逻辑就同tomcat处理bio类似
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
// 该方法是用来从socket中读数据或写数据的,dispatch表示是不是要把这个任务派发给线程池,也就是要不要异步
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
// 获取一个SocketProcessor对象
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
// 派发给线程池
if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:"+socket,rx);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
4、进入SocketProcessor线程(真正意义处理请求、解析请求行、请求头)\最终会交给Http11NioProcessor的父类的process()处理
public SocketState process(SocketWrapper<S> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // 设置请求状态为解析状态
// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint); // 将socket的InputStream与InternalInputBuffer进行绑定
getOutputBuffer().init(socketWrapper, endpoint); // 将socket的OutputStream与InternalOutputBuffer进行绑定
// Flags
keepAlive = true;
comet = false;
openSocket = false;
sendfileInProgress = false;
readComplete = true;
// NioEndpoint返回true, Bio返回false
if (endpoint.getUsePolling()) {
keptAlive = false;
} else {
keptAlive = socketWrapper.isKeptAlive();
}
// 如果当前活跃的线程数占线程池最大线程数的比例大于75%,那么则关闭KeepAlive,不再支持长连接
if (disableKeepAlive()) {
socketWrapper.setKeepAliveLeft(0);
}
// keepAlive默认为true,它的值会从请求中读取
while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
upgradeInbound == null &&
httpUpgradeHandler == null && !endpoint.isPaused()) {
// keepAlive如果为true,接下来需要从socket中不停的获取http请求
// Parsing the request header
try {
// 第一次从socket中读取数据,并设置socket的读取数据的超时时间
// 对于BIO,一个socket连接建立好后,不一定马上就被Tomcat处理了,其中需要线程池的调度,所以这段等待的时间要算在socket读取数据的时间内
// 而对于NIO而言,没有阻塞
setRequestLineReadTimeout();
// 解析请求行
if (!getInputBuffer().parseRequestLine(keptAlive)) {
// 下面这个方法在NIO时有用,比如在解析请求行时,如果没有从操作系统读到数据,则上面的方法会返回false
// 而下面这个方法会返回true,从而退出while,表示此处read事件处理结束
// 到下一次read事件发生了,就会从小进入到while中
if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
// 503 - Service unavailable
// 如果Endpoint被暂停了,则返回503
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
// 每次处理一个请求就重新获取一下请求头和cookies的最大限制
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
request.getCookies().setLimit(getMaxCookieCount());
// Currently only NIO will ever return false here
// 解析请求头
if (!getInputBuffer().parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
setSocketTimeout(connectionUploadTimeout);
}
}
} catch (IOException e) {
if (getLog().isDebugEnabled()) {
getLog().debug(
sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_NOW, e);
break;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString(
"http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString(
"http11processor.fallToDebug");
//$FALL-THROUGH$
case INFO:
getLog().info(message, t);
break;
case DEBUG:
getLog().debug(message, t);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); // 设置请求状态为预处理状态
try {
prepareRequest(); // 预处理, 主要从请求中处理处keepAlive属性,以及进行一些验证,以及根据请求分析得到ActiveInputFilter
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
if (maxKeepAliveRequests == 1) {
// 如果最大的活跃http请求数量仅仅只能为1的话,那么设置keepAlive为false,则不会继续从socket中获取Http请求了
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
// 如果已经达到了keepAlive的最大限制,也设置为false,则不会继续从socket中获取Http请求了
keepAlive = false;
}
// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); // 设置请求的状态为服务状态,表示正在处理请求
adapter.service(request, response); // 交给容器处理请求
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && (
response.getErrorException() != null ||
(!isAsync() &&
statusDropsConnection(response.getStatus())))) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
setCometTimeouts(socketWrapper);
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_NOW, e);
} catch (HeadersTooLargeException e) {
getLog().error(sm.getString("http11processor.request.process"), e);
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); // 设置请求的状态为处理请求结束
if (!isAsync() && !comet) {
if (getErrorState().isError()) {
// If we know we are closing the connection, don't drain
// input. This way uploading a 100GB file doesn't tie up the
// thread if the servlet has rejected it.
getInputBuffer().setSwallowInput(false);
} else {
// Need to check this again here in case the response was
// committed before the error that requires the connection
// to be closed occurred.
checkExpectationAndResponseStatus();
}
// 当前http请求已经处理完了,做一些收尾工作
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // 请求状态为输出结束
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
request.updateCounters();
if (!isAsync() && !comet || getErrorState().isError()) {
if (getErrorState().isIoAllowed()) {
// 准备处理下一个请求
getInputBuffer().nextRequest();
getOutputBuffer().nextRequest();
}
}
if (!disableUploadTimeout) {
if(endpoint.getSoTimeout() > 0) {
setSocketTimeout(endpoint.getSoTimeout());
} else {
setSocketTimeout(0);
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
// 如果处理完当前这个Http请求之后,发现socket里没有下一个请求了,那么就退出当前循环
// 如果是keepalive,就不会关闭socket, 如果是close就会关闭socket
// 对于keepalive的情况,因为是一个线程处理一个socket,当退出这个while后,当前线程就会介绍,
// 当时对于socket来说,它仍然要继续介绍连接,所以又会新开一个线程继续来处理这个socket
if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}
// 至此,循环结束
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
// 主要流程就是将socket的状态设置为CLOSED
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync() || comet) {
// 异步servlet
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else if (getUpgradeInbound() != null) {
return SocketState.UPGRADING_TOMCAT;
} else {
if (sendfileInProgress) {
return SocketState.SENDFILE;
} else {
// openSocket为true,表示不要关闭socket
if (openSocket) {
// readComplete表示本次读数据是否完成,比如nio中可能就没有读完数据,还需要从socket中读数据
if (readComplete) {
return SocketState.OPEN;
} else {
// nio可能会走到这里
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}
版权声明:本文为qq_33590654原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。