Tomcat处理请求过程源码剖析

一、处理请求时序图(以tomcat7 nio为列)

在这里插入图片描述
线程池处理SocketProcessor中请求过程时序图在这里插入图片描述

二、源码剖析

1、NioEndpointer启动时(启动流程上一篇有时序图)会开启Acceptor、Poller线程
默认Acceptor数量是1 ,Poller数量如果是多核就是两个线程,如果单核就是一个线程
当然都可以配置如下: acceptorThreadCount、pollerThreadCount配置

    <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"    acceptorThreadCount="1"  pollerThreadCount="5"
               connectionTimeout="20000"
               redirectPort="8443" />

在这里插入图片描述
2、进入Acceptor线程中
run()方法中主要就是接受socket请求、

删减多余代码
 public void run() {

            while (running) {//一直运行

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        socket = serverSock.accept(); //
                   
      					//处理socket
                        if (!setSocketOptions(socket)) {
                            countDownConnection();
                            closeSocket(socket);
                        }
        }

进入setSocketOptions()中,主要作用就是把socket封装成NioChannel注册到poller队列中

protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            // 从该channel上读取数据不阻塞
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            // 每接收到一个socket连接就获取一个NioChannel来封装这个socket,NioChannel是可重用的对象
            NioChannel channel = nioChannels.poll(); // 拿出对头的NioChannel
            /*
			  此处省去多余代码,就是创建NioChannel封装socket,从缓存中获取NioChannel没有就创建一个
            */  
            // 每接收到一个新socket连接,就会生成一个
            getPoller0().register(channel);
        } catch (Throwable t) {
        }
        return true;
    }

3、Poller线程
进入Poller的register()方法,创建PollerEvent加入到poller队列中,此时并没有真正注册到select上

        public void register(final NioChannel socket) {
            socket.setPoller(this);

            // 获取一个KeyAttachment对象,将当前socket的相关信息设置进去
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());

            // 获取一个PollerEvent对象,本事件为一个注册事件,对读事件感兴趣(这里暂时还没有真正的向select去注册事件)
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            // 把PollerEvent添加到事件列表中去
            addEvent(r);
        }

在进入poller线程run方法,tomcat启动时这个线程就已经在运行了,主要作用就是,调用events()处理队列中的pollerevnt,真正把socket注册到select上、然后去select上获取就绪事件调用processKey处理事件

 public void run() {
            // Loop until destroy() is called
            while (true) {
                try {
                    // Loop if endpoint is paused
                    while (paused && (!close) ) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }

                    boolean hasEvents = false;

                    // Time to terminate?
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString(
                                    "endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    } else {
                        // 执行PollerEvent事件,向Selector注册读写事件
                        hasEvents = events(); // 真正的向selector注册
                    }
                    try {
                        if ( !close ) {
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                // 上面的events()会去注册事件,而这里是去查询是否有事件就绪
                                // 不阻塞
                                keyCount = selector.selectNow();
                            } else {
                                // 阻塞,超时会继续执行下面的代码,不会报错
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        if (close) {
                            events();
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString(
                                        "endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                    } catch ( NullPointerException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch ( CancelledKeyException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error("",x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                    // 如果存在就绪事件,那么则遍历并处理事件
                    Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    // 循环处理当前就绪的事件
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            attachment.access();
                            iterator.remove();
                            // 处理事件
                            processKey(sk, attachment);
                        }
                    }//while

                    //process timeouts
                    timeout(keyCount,hasEvents);
                    if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            System.err.println(oomParachuteMsg);
                            oomt.printStackTrace();
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                }
            }//while

            stopLatch.countDown();
        }

进入 processKey(sk, attachment);主要是处理请求、所以会进入processSocket()

 protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
            boolean result = true;
            try {
                if ( close ) {
                    cancelledKey(sk, SocketStatus.STOP, attachment.comet);
                } else if ( sk.isValid() && attachment != null ) {
                    attachment.access();//make sure we don't time out valid sockets
                    sk.attach(attachment);//cant remember why this is here

                    // 当前就绪事件对应的channel
                    NioChannel channel = attachment.getChannel();
                    // 读就绪或写就绪
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
                            if ( isWorkerAvailable() ) {
                                unreg(sk, attachment, sk.readyOps()); //
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    // 从channel中读取数据
                                    if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                                // 读完数据之后可能就要写数据
                                if (!closeSocket && sk.isWritable()) {
                                    // 将数据写入到channel中
                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    cancelledKey(sk,SocketStatus.DISCONNECT,false);
                                }
                            } else {
                                result = false;
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk, SocketStatus.ERROR,false);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk, SocketStatus.ERROR,false);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
            return result;
        }

再次进入processSocket()方法中,可以看到会生成SocketProcessor交给线程池处理、下面逻辑就同tomcat处理bio类似

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
        // 该方法是用来从socket中读数据或写数据的,dispatch表示是不是要把这个任务派发给线程池,也就是要不要异步

        try {
            KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
            if (attachment == null) {
                return false;
            }
            attachment.setCometNotify(false); //will get reset upon next reg

            // 获取一个SocketProcessor对象
            SocketProcessor sc = processorCache.poll();
            if ( sc == null ) sc = new SocketProcessor(socket,status);
            else sc.reset(socket,status);

            // 派发给线程池
            if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
            else sc.run();
        } catch (RejectedExecutionException rx) {
            log.warn("Socket processing request was rejected for:"+socket,rx);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

4、进入SocketProcessor线程(真正意义处理请求、解析请求行、请求头)\最终会交给Http11NioProcessor的父类的process()处理

 public SocketState process(SocketWrapper<S> socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);   // 设置请求状态为解析状态

        // Setting up the I/O
        setSocketWrapper(socketWrapper);
        getInputBuffer().init(socketWrapper, endpoint);     // 将socket的InputStream与InternalInputBuffer进行绑定
        getOutputBuffer().init(socketWrapper, endpoint);    // 将socket的OutputStream与InternalOutputBuffer进行绑定

        // Flags
        keepAlive = true;
        comet = false;
        openSocket = false;
        sendfileInProgress = false;
        readComplete = true;
        // NioEndpoint返回true, Bio返回false
        if (endpoint.getUsePolling()) {
            keptAlive = false;
        } else {
            keptAlive = socketWrapper.isKeptAlive();
        }

        // 如果当前活跃的线程数占线程池最大线程数的比例大于75%,那么则关闭KeepAlive,不再支持长连接
        if (disableKeepAlive()) {
            socketWrapper.setKeepAliveLeft(0);
        }

        // keepAlive默认为true,它的值会从请求中读取
        while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
                upgradeInbound == null &&
                httpUpgradeHandler == null && !endpoint.isPaused()) {
            // keepAlive如果为true,接下来需要从socket中不停的获取http请求

            // Parsing the request header
            try {
                // 第一次从socket中读取数据,并设置socket的读取数据的超时时间
                // 对于BIO,一个socket连接建立好后,不一定马上就被Tomcat处理了,其中需要线程池的调度,所以这段等待的时间要算在socket读取数据的时间内
                // 而对于NIO而言,没有阻塞
                setRequestLineReadTimeout();

                // 解析请求行
                if (!getInputBuffer().parseRequestLine(keptAlive)) {
                    // 下面这个方法在NIO时有用,比如在解析请求行时,如果没有从操作系统读到数据,则上面的方法会返回false
                    // 而下面这个方法会返回true,从而退出while,表示此处read事件处理结束
                    // 到下一次read事件发生了,就会从小进入到while中
                    if (handleIncompleteRequestLineRead()) {
                        break;
                    }
                }

                if (endpoint.isPaused()) {
                    // 503 - Service unavailable
                    // 如果Endpoint被暂停了,则返回503
                    response.setStatus(503);
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                } else {
                    keptAlive = true;
                    // Set this every time in case limit has been changed via JMX
                    // 每次处理一个请求就重新获取一下请求头和cookies的最大限制
                    request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                    request.getCookies().setLimit(getMaxCookieCount());
                    // Currently only NIO will ever return false here
                    // 解析请求头
                    if (!getInputBuffer().parseHeaders()) {
                        // We've read part of the request, don't recycle it
                        // instead associate it with the socket
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!disableUploadTimeout) {
                        setSocketTimeout(connectionUploadTimeout);
                    }
                }
            } catch (IOException e) {
                if (getLog().isDebugEnabled()) {
                    getLog().debug(
                            sm.getString("http11processor.header.parse"), e);
                }
                setErrorState(ErrorState.CLOSE_NOW, e);
                break;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                UserDataHelper.Mode logMode = userDataHelper.getNextMode();
                if (logMode != null) {
                    String message = sm.getString(
                            "http11processor.header.parse");
                    switch (logMode) {
                        case INFO_THEN_DEBUG:
                            message += sm.getString(
                                    "http11processor.fallToDebug");
                            //$FALL-THROUGH$
                        case INFO:
                            getLog().info(message, t);
                            break;
                        case DEBUG:
                            getLog().debug(message, t);
                    }
                }
                // 400 - Bad Request
                response.setStatus(400);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }

            if (!getErrorState().isError()) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);  // 设置请求状态为预处理状态
                try {
                    prepareRequest();   // 预处理, 主要从请求中处理处keepAlive属性,以及进行一些验证,以及根据请求分析得到ActiveInputFilter
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString(
                                "http11processor.request.prepare"), t);
                    }
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                    getAdapter().log(request, response, 0);
                }
            }

            if (maxKeepAliveRequests == 1) {
                // 如果最大的活跃http请求数量仅仅只能为1的话,那么设置keepAlive为false,则不会继续从socket中获取Http请求了
                keepAlive = false;
            } else if (maxKeepAliveRequests > 0 &&
                    socketWrapper.decrementKeepAlive() <= 0) {
                // 如果已经达到了keepAlive的最大限制,也设置为false,则不会继续从socket中获取Http请求了
                keepAlive = false;
            }

            // Process the request in the adapter
            if (!getErrorState().isError()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); // 设置请求的状态为服务状态,表示正在处理请求
                    adapter.service(request, response); // 交给容器处理请求
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !getErrorState().isError() && (
                            response.getErrorException() != null ||
                                    (!isAsync() &&
                                    statusDropsConnection(response.getStatus())))) {
                        setErrorState(ErrorState.CLOSE_CLEAN, null);
                    }
                    setCometTimeouts(socketWrapper);
                } catch (InterruptedIOException e) {
                    setErrorState(ErrorState.CLOSE_NOW, e);
                } catch (HeadersTooLargeException e) {
                    getLog().error(sm.getString("http11processor.request.process"), e);
                    // The response should not have been committed but check it
                    // anyway to be safe
                    if (response.isCommitted()) {
                        setErrorState(ErrorState.CLOSE_NOW, e);
                    } else {
                        response.reset();
                        response.setStatus(500);
                        setErrorState(ErrorState.CLOSE_CLEAN, e);
                        response.setHeader("Connection", "close"); // TODO: Remove
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    getLog().error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                    getAdapter().log(request, response, 0);
                }
            }

            // Finish the handling of the request
            rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);  // 设置请求的状态为处理请求结束

            if (!isAsync() && !comet) {
                if (getErrorState().isError()) {
                    // If we know we are closing the connection, don't drain
                    // input. This way uploading a 100GB file doesn't tie up the
                    // thread if the servlet has rejected it.
                    getInputBuffer().setSwallowInput(false);
                } else {
                    // Need to check this again here in case the response was
                    // committed before the error that requires the connection
                    // to be closed occurred.
                    checkExpectationAndResponseStatus();
                }
                // 当前http请求已经处理完了,做一些收尾工作
                endRequest();
            }

            rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // 请求状态为输出结束

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (getErrorState().isError()) {
                response.setStatus(500);
            }
            request.updateCounters();

            if (!isAsync() && !comet || getErrorState().isError()) {
                if (getErrorState().isIoAllowed()) {
                    // 准备处理下一个请求
                    getInputBuffer().nextRequest();
                    getOutputBuffer().nextRequest();
                }
            }

            if (!disableUploadTimeout) {
                if(endpoint.getSoTimeout() > 0) {
                    setSocketTimeout(endpoint.getSoTimeout());
                } else {
                    setSocketTimeout(0);
                }
            }

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

            // 如果处理完当前这个Http请求之后,发现socket里没有下一个请求了,那么就退出当前循环
            // 如果是keepalive,就不会关闭socket, 如果是close就会关闭socket
            // 对于keepalive的情况,因为是一个线程处理一个socket,当退出这个while后,当前线程就会介绍,
            // 当时对于socket来说,它仍然要继续介绍连接,所以又会新开一个线程继续来处理这个socket
            if (breakKeepAliveLoop(socketWrapper)) {
                break;
            }
        }
        // 至此,循环结束

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        // 主要流程就是将socket的状态设置为CLOSED
        if (getErrorState().isError() || endpoint.isPaused()) {
            return SocketState.CLOSED;
        } else if (isAsync() || comet) {
            // 异步servlet
            return SocketState.LONG;
        } else if (isUpgrade()) {
            return SocketState.UPGRADING;
        } else if (getUpgradeInbound() != null) {
            return SocketState.UPGRADING_TOMCAT;
        } else {
            if (sendfileInProgress) {
                return SocketState.SENDFILE;
            } else {
                // openSocket为true,表示不要关闭socket
                if (openSocket) {
                    // readComplete表示本次读数据是否完成,比如nio中可能就没有读完数据,还需要从socket中读数据
                    if (readComplete) {
                        return SocketState.OPEN;
                    } else {
                        // nio可能会走到这里
                        return SocketState.LONG;
                    }
                } else {
                    return SocketState.CLOSED;
                }
            }
        }
    }

版权声明:本文为qq_33590654原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。