`
nod0620
  • 浏览: 19698 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

tomcat的Http11Protocol,Http11Processor

阅读更多

  上文说到在JIoEndpoint类中处理请求最终是调用到内部接口Handler的process()方法,而Handler的实现类是Http11Protocol的内部类Http11ConnectionHandler,Http11ConnectionHandler又委托Http11Processor进行处理,tomcat在这个地方我觉得类的设计不是很优雅啊,内部类用的太多,导致阅读的时候有困难(好处是内部类持有外部类的的引用,不用使用显示的new)。

先把Http11ConnectionHandler的类附上:

    protected static class Http11ConnectionHandler implements Handler {

        protected Http11Protocol proto;
        protected AtomicLong registerCount = new AtomicLong(0);
        protected RequestGroupInfo global = new RequestGroupInfo();

        //无界线程安全队列,不会阻塞,自己加工的原因是processorCache的数量是有限的,为了线程和性能考虑
        protected ConcurrentLinkedQueue<Http11Processor> recycledProcessors = 
            new ConcurrentLinkedQueue<Http11Processor>() {
            protected AtomicInteger size = new AtomicInteger(0);
            public boolean offer(Http11Processor processor) {
                boolean offer = (proto.processorCache == -1) ? true : (size.get() < proto.processorCache);
                //avoid over growing our cache or add after we have stopped
                boolean result = false;
                if ( offer ) {
                    result = super.offer(processor);
                    if ( result ) {
                        size.incrementAndGet();
                    }
                }
                if (!result) unregister(processor);
                return result;
            }
            
            public Http11Processor poll() {
                Http11Processor result = super.poll();
                if ( result != null ) {
                    size.decrementAndGet();
                }
                return result;
            }
            
            public void clear() {
                Http11Processor next = poll();
                while ( next != null ) {
                    unregister(next);
                    next = poll();
                }
                super.clear();
                size.set(0);
            }
        };

        Http11ConnectionHandler(Http11Protocol proto) {
            this.proto = proto;
        }

        public boolean process(Socket socket) {
        	//processorCache里面没有processor
            Http11Processor processor = recycledProcessors.poll();
            try {

                if (processor == null) {
                    processor = createProcessor();
                }

                if (processor instanceof ActionHook) {
                    ((ActionHook) processor).action(ActionCode.ACTION_START, null);
                }

                if (proto.isSSLEnabled() && (proto.sslImplementation != null)) {
                    processor.setSSLSupport
                        (proto.sslImplementation.getSSLSupport(socket));
                } else {
                    processor.setSSLSupport(null);
                }
                
                processor.process(socket);
                return false;

            } catch(java.net.SocketException e) {
                // SocketExceptions are normal
                Http11Protocol.log.debug
                    (sm.getString
                     ("http11protocol.proto.socketexception.debug"), e);
            } catch (java.io.IOException e) {
                // IOExceptions are normal
                Http11Protocol.log.debug
                    (sm.getString
                     ("http11protocol.proto.ioexception.debug"), e);
            }
            // Future developers: if you discover any other
            // rare-but-nonfatal exceptions, catch them here, and log as
            // above.
            catch (Throwable e) {
                // any other exception or error is odd. Here we log it
                // with "ERROR" level, so it will show up even on
                // less-than-verbose logs.
                Http11Protocol.log.error
                    (sm.getString("http11protocol.proto.error"), e);
            } finally {
                //       if(proto.adapter != null) proto.adapter.recycle();
                //                processor.recycle();

                if (processor instanceof ActionHook) {
                    ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
                }
                recycledProcessors.offer(processor);
            }
            return false;
        }
}

  看process()方法前,我们看另外一个有意思的东西,ConcurrentLinkedQueue的子类,自己实现的原因是ConcurrentLinkedQueue是无界的,非阻塞的,而这个需要的是有界的,查了下jdk的concurrent包,没有现有的有界的,非阻塞的实现类,所有这里自己实现个,还有实现这个主要还和jmx的注册联系在一起,这里由于processorCache==-1,长度限制这个原因不存在。主要看process()这个方法

    首先,从ConcurrentLinkedQueue中取得一个Http11Processor,没有的话就创建一个。

    由于Http11Processor是ActionHook的实现,ActionHook的action方法被执行,这个以后再说,这里是   ActionCode.ACTION_START的事件的执行

    接着是SSL的一些的设置,在https的时候有用

    接着重点,调用Http11Processor的process的方法

    后面就是收拾残局了,什么错误啊,Http11Processor的回收了,ACTION_STOP的事件执行了

 

    转到Http11Processor的process()的方法:

 

   public void process(Socket theSocket)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Set the remote address
        remoteAddr = null;
        remoteHost = null;
        localAddr = null;
        localName = null;
        remotePort = -1;
        localPort = -1;

        // Setting up the I/O
        this.socket = theSocket;
        inputBuffer.setInputStream(socket.getInputStream());
        outputBuffer.setOutputStream(socket.getOutputStream());

        // Error flag
        error = false;
        keepAlive = true;

        int keepAliveLeft = maxKeepAliveRequests;
        int soTimeout = endpoint.getSoTimeout();

        // When using an executor, these values may return non-positive values
        int curThreads = endpoint.getCurrentThreadsBusy();
        int maxThreads = endpoint.getMaxThreads();
        if (curThreads > 0 && maxThreads > 0) {
            // Only auto-disable keep-alive if the current thread usage % can be
            // calculated correctly
            if ((curThreads*100)/maxThreads > 75) {
                keepAliveLeft = 1;
            }
        }
        
        try {
            socket.setSoTimeout(soTimeout);
        } catch (Throwable t) {
            log.debug(sm.getString("http11processor.socket.timeout"), t);
            error = true;
        }

        boolean keptAlive = false;

        //keepAlive:当处理完用户发起的 HTTP 请求后是否立即关闭 TCP 连接
        while (started && !error && keepAlive) {

            // Parsing the request header
            try {
                if (keptAlive) {
                    if (keepAliveTimeout > 0) {
                        socket.setSoTimeout(keepAliveTimeout);
                    }
                    else if (soTimeout > 0) {
                        socket.setSoTimeout(soTimeout);
                    }
                }
                
                //解析http header 的第一行数据
                inputBuffer.parseRequestLine();
                request.setStartTime(System.currentTimeMillis());
                keptAlive = true;
                if (disableUploadTimeout) {
                    socket.setSoTimeout(soTimeout);
                } else {
                    socket.setSoTimeout(timeout);
                }
              //解析http header 的除第一行数据以外的数据
                inputBuffer.parseHeaders();
            } catch (IOException e) {
                error = true;
                break;
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.header.parse"), t);
                }
                // 400 - Bad Request
                response.setStatus(400);
                error = true;
            }

            if (!error) {
                // Setting up filters, and parse some request headers
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
                    prepareRequest();
                } catch (Throwable t) {
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("http11processor.request.prepare"), t);
                    }
                    // 400 - Internal Server Error
                    response.setStatus(400);
                    error = true;
                }
            }

            if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0)
                keepAlive = false;

            // Process the request in the adapter
            if (!error) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    adapter.service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !error) { // Avoid checking twice.
                        error = response.getErrorException() != null ||
                                statusDropsConnection(response.getStatus());
                    }

                } catch (InterruptedIOException e) {
                    error = true;
                } catch (Throwable t) {
                    log.error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    error = true;
                }
            }

            // Finish the handling of the request
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
                // If we know we are closing the connection, don't drain input.
                // This way uploading a 100GB file doesn't tie up the thread 
                // if the servlet has rejected it.
                if(error)
                    inputBuffer.setSwallowInput(false);
                inputBuffer.endRequest();
            } catch (IOException e) {
                error = true;
            } catch (Throwable t) {
                log.error(sm.getString("http11processor.request.finish"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                error = true;
            }
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
                outputBuffer.endRequest();
            } catch (IOException e) {
                error = true;
            } catch (Throwable t) {
                log.error(sm.getString("http11processor.response.finish"), t);
                error = true;
            }

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (error) {
                response.setStatus(500);
            }
            request.updateCounters();

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

            // Don't reset the param - we'll see it as ended. Next request
            // will reset it
            // thrA.setParam(null);
            // Next request
            inputBuffer.nextRequest();
            outputBuffer.nextRequest();

        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        // Recycle
        inputBuffer.recycle();
        outputBuffer.recycle();
        this.socket = null;
        // Recycle ssl info
        sslSupport = null;
    }

    Http11Processor很夸张的又引用到JIoEndpoint(感觉很乱,是否有循环有用的感觉?)

    这里有一个很有意思的变量keepAliveLeft,一开始的时候等于maxKeepAliveRequests,也就是创建Processor的时候传入的Http11Protocol的maxKeepAliveRequests,默认为100,当JIoEndpoint的curThreads(当前已经用的线程数)/maxThreads>0.75,maxKeepAliveRequests马上可怜的变成1,这里可以认为是过载保护。

   当开始标志,没有错误标志等设置后,就开始真正解析这个传入的socket了.

   在这之前实例化InternalInputBuffer,InternalOutputBuffer两个类,对应socket的输入和输出,是可以重复使用的高性能的类。

   首先调用InternalInputBuffer的parseRequestLine()方法解析http的请求行,即'请求方法 请求uri http版本'那一行

  接着解析请求头InternalInputBuffer的parseHeaders()方法干这个活

 

    public void parseHeaders()
        throws IOException {

        while (parseHeader()) {
        }

        parsingHeader = false;
        end = pos;

    }

   while循环解析每一行,请求头有个专门的类MimeHeaders存放,里面一般存放的是MessageBytes,简单的说一行就是MessageBytes,这个类有char,byte,String等表示,好处是很方便,任何三类都可以处理,对于涉及编码的来说

,char和byte可以非常方便的进行原生态的表示

   接着是调用prepareRequest()方法,其实在parseHeaders()中进行了http headers的解析,这里只是拿出来用而已,只是把org.apache.coyote.Request的属性设置好而已

 

    Http11Processor的process方法就到这里

    回到Http11ConnectionHandler的process()方法,最后面有代码:

        finally {
                //       if(proto.adapter != null) proto.adapter.recycle();
                //                processor.recycle();

                if (processor instanceof ActionHook) {
                    ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
                }
                recycledProcessors.offer(processor);
            }

    这里调用了Http11Processor的action的方法,我们看下这个方法:

 

    	
        if (actionCode == ActionCode.ACTION_COMMIT) {
        	
            // Commit current response  只有在response中触发这个分支
        	
        	
        	//一般来说现在response.isCommitted()是false的,但tomcat中很多地方都有这样的判断,主要是为了稳定性考虑(不存在多线程问题?)
            if (response.isCommitted())
                return;

            // Validate and write response headers
            prepareResponse();
            try {
                outputBuffer.commit();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } 
 

    第一个分支就是,注意  prepareResponse(); 这里是准备org.apache.coyote.Response方法了,这个和request的处理差不多,是设置http协议响应头的一些内容

 

  准备好的org.apache.coyote.Request和org.apache.coyote.Response在CoyoteAdapter里面会被用到,返回给tomcat的Connector组件,这个下回再说。

 

 

  哇,真长!

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics