前言
上篇文章讲到了 ConnectionHandler#process 方法,其中最关键的步骤是获取一个 org.apache.coyote.Processor 对象,然后调用这个对象的 process 方法,传入的参数就是它自己接收的参数,也就是 和 NioSocketWrapper 对象 和 SocketEvent 对象。tomcat 中 用于处理 http 请求的 Processor 的实现类有 Http11Processor 和 StreamProcessor,这两者的父类都是 AbstractProcessor,而 AbstractProcessor 的父类是 AbstractProcessorLight,AbstractProcessorLight 直接实现了 Processor。StreamProcessor 是用于处理 Http/2 的,本文以 Http11Processor 为例进行分析。StreamProcessor 和 Http11Processor 在大体处理逻辑上是一样的。
1. Http11Processor 构造方法
public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) { super(adapter); this.protocol = protocol; httpParser = new HttpParser(protocol.getRelaxedPathChars(), protocol.getRelaxedQueryChars()); inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(), protocol.getRejectIllegalHeaderName(), httpParser); request.setInputBuffer(inputBuffer); outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize()); response.setOutputBuffer(outputBuffer); // Create and add the identity filters. inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize())); outputBuffer.addFilter(new IdentityOutputFilter()); // Create and add the chunked filters. inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(), protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(), protocol.getMaxSwallowSize())); outputBuffer.addFilter(new ChunkedOutputFilter()); // Create and add the void filters. inputBuffer.addFilter(new VoidInputFilter()); outputBuffer.addFilter(new VoidOutputFilter()); // Create and add buffered input filter inputBuffer.addFilter(new BufferedInputFilter()); // Create and add the chunked filters. //inputBuffer.addFilter(new GzipInputFilter()); outputBuffer.addFilter(new GzipOutputFilter()); pluggableFilterIndex = inputBuffer.getFilters().length; }
public AbstractProcessor(Adapter adapter) { this(adapter, new Request(), new Response()); } protected AbstractProcessor(Adapter adapter, Request coyoteRequest, Response coyoteResponse) { this.adapter = adapter; asyncStateMachine = new AsyncStateMachine(this); request = coyoteRequest; response = coyoteResponse; response.setHook(this); request.setResponse(response); request.setHook(this); userDataHelper = new UserDataHelper(getLog()); }
Http11Processor 的构造方法里初始化了 request(org.apache.coyote.Request),response(org.apache.coyote.Response), httpParser(HttpParser)、inputBuffer(Http11InputBuffer)、outputBuffer(Http11OutputBuffer),以及一些 InputFilter 和 OutputFilter 等,这些是处理 http 协议必需的。
2. AbstractProcessorLight#process
Processor#process 方法在 AbstractProcessorLight 中被实现。
@Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } } if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED); return state; }
process 方法在一个 do-while 循环里,根据不同的条件,分别处理,其中重要的处理是调用 dispatch 方法或者 service 方法。
/** * Process an in-progress request that is not longer in standard HTTP mode. * Uses currently include Servlet 3.0 Async and HTTP upgrade connections. * Further uses may be added in the future. These will typically start as * HTTP requests. * @param status The event to process * @return the socket state */ protected abstract SocketState dispatch(SocketEvent status);
从注释里可以看出,dispatch 方法是处理非标准 HTTP 模式下的正在处理中的请求,这是在 Servlet 3.0 Async 和 HTTP 升级连接里用到的。
@Override public final SocketState dispatch(SocketEvent status) { if (status == SocketEvent.OPEN_WRITE && response.getWriteListener() != null) { asyncStateMachine.asyncOperation(); try { if (flushBufferedWrite()) { return SocketState.LONG; } } catch (IOException ioe) { if (getLog().isDebugEnabled()) { getLog().debug("Unable to write async data.", ioe); } status = SocketEvent.ERROR; request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe); } } else if (status == SocketEvent.OPEN_READ && request.getReadListener() != null) { dispatchNonBlockingRead(); } else if (status == SocketEvent.ERROR) { // An I/O error occurred on a non-container thread. This includes: // - read/write timeouts fired by the Poller (NIO & APR) // - completion handler failures in NIO2 if (request.getAttribute(RequestDispatcher.ERROR_EXCEPTION) == null) { // Because the error did not occur on a container thread the // request's error attribute has not been set. If an exception // is available from the socketWrapper, use it to set the // request's error attribute here so it is visible to the error // handling. request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, socketWrapper.getError()); } if (request.getReadListener() != null || response.getWriteListener() != null) { // The error occurred during non-blocking I/O. Set the correct // state else the error handling will trigger an ISE. asyncStateMachine.asyncOperation(); } } RequestInfo rp = request.getRequestProcessor(); try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); if (!getAdapter().asyncDispatch(request, response, status)) { setErrorState(ErrorState.CLOSE_NOW, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); setErrorState(ErrorState.CLOSE_NOW, t); getLog().error(sm.getString("http11processor.request.process"), t); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (getErrorState().isError()) { request.updateCounters(); return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else { request.updateCounters(); return dispatchEndRequest(); } }
在 dispatch 方法里,首先根据传入的参数 SocketEvent 的值做不同的处理。其中 flushBufferedWrite() 方法就是把 Http11OutputBuffer 里的数据写回给客户端
@Override protected boolean flushBufferedWrite() throws IOException { if (outputBuffer.hasDataToWrite()) { if (outputBuffer.flushBuffer(false)) { // The buffer wasn't fully flushed so re-register the // socket for write. Note this does not go via the // Response since the write registration state at // that level should remain unchanged. Once the buffer // has been emptied then the code below will call // Adaptor.asyncDispatch() which will enable the // Response to respond to this event. outputBuffer.registerWriteInterest(); return true; } } return false; }
Http11OutputBuffer#flushBuffer
protected boolean flushBuffer(boolean block) throws IOException { return socketWrapper.flush(block); }
asyncStateMachine 是 AsyncStateMachine 类型的对象
synchronized void asyncOperation() { if (state==AsyncState.STARTED) { state = AsyncState.READ_WRITE_OP; } else { throw new IllegalStateException( sm.getString("asyncStateMachine.invalidAsyncState", "asyncOperation()", state)); } }
asyncStateMachine.asyncOperation() 就是把 AsyncStateMachine 里的 state 属性从 AsyncState.STARTED 改成 AsyncState.READ_WRITE_OP。READ_WRITE_OP 状态下表示这个请求已经准备好读写了。
if-else 语句块之后,就是 dispatch 方法的关键了。
首先调用 request.getRequestProcessor() 获取一个 RequestInfo 对象。
private final RequestInfo reqProcessorMX=new RequestInfo(this); public RequestInfo getRequestProcessor() { return reqProcessorMX; }
这个 request 是 AbstractProcessor 构造方法里初始化的 org.apache.coyote.Request 对象。
然后就在 try-catch 语句块里调用 getAdapter().asyncDispatch(request, response, status) 方法。
getAdapter() 是 AbstractProcessor 里的方法,返回的是 Adapter 属性。
protected final Adapter adapter; /** * Get the associated adapter. * * @return the associated adapter */ public Adapter getAdapter() { return adapter; }
而 AbstractProcessor 里的 Adapter 类型的属性是在创建 Http11Processor 对象时赋值的,传入的 CoyoteAdapter 对象,
这个对象是在 Connector#initInternal 方法里创建并赋值给 AbstractProtocol 的 adapter 属性的。
所以 getAdapter().asyncDispatch(...) 调用的是 CoyoteAdapter#asyncDispatch 方法。
CoyoteAdapter 是处理请求的环节中重要的一环,后面的文章中会讲到,这里先略过。
/** * Service a 'standard' HTTP request. This method is called for both new * requests and for requests that have partially read the HTTP request line * or HTTP headers. Once the headers have been fully read this method is not * called again until there is a new HTTP request to process. Note that the * request type may change during processing which may result in one or more * calls to {@link #dispatch(SocketEvent)}. Requests may be pipe-lined. * * @param socketWrapper The connection to process * * @return The state the caller should put the socket in when this method * returns * * @throws IOException If an I/O error occurs during the processing of the * request */ protected abstract SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException;
从注释中可以看出,与 dispatch 方法相对立,service 方法是用来处理标准的 HTTP 请求的。service 方法的实现 Http11Processor 里。
@Override public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the I/O setSocketWrapper(socketWrapper); inputBuffer.init(socketWrapper); outputBuffer.init(socketWrapper); // Flags keepAlive = true; openSocket = false; readComplete = true; boolean keptAlive = false; SendfileState sendfileState = SendfileState.DONE; while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) { // Parsing the request header try { if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())) { if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break; } } if (protocol.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!protocol.getDisableUploadTimeout()) { socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout()); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null) { String message = sm.getString("http11processor.header.parse"); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString("http11processor.fallToDebug"); //$FALL-THROUGH$ case INFO: log.info(message, t); break; case DEBUG: log.debug(message, t); } } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); } // Has an upgrade been requested? Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); boolean foundUpgrade = false; while (connectionValues.hasMoreElements() && !foundUpgrade) { foundUpgrade = connectionValues.nextElement().toLowerCase( Locale.ENGLISH).contains("upgrade"); } if (foundUpgrade) { // Check the protocol String requestedProtocol = request.getHeader("Upgrade"); UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol); if (upgradeProtocol != null) { if (upgradeProtocol.accept(request)) { response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setHeader("Connection", "Upgrade"); response.setHeader("Upgrade", requestedProtocol); action(ActionCode.CLOSE, null); getAdapter().log(request, response, 0); InternalHttpUpgradeHandler upgradeHandler = upgradeProtocol.getInternalUpgradeHandler( socketWrapper, getAdapter(), cloneRequest(request)); UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null); action(ActionCode.UPGRADE, upgradeToken); return SocketState.UPGRADING; } } } if (getErrorState().isIoAllowed()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare"), t); } // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); } } int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests(); if (maxKeepAliveRequests == 1) { keepAlive = false; } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) { keepAlive = false; } // Process the request in the adapter if (getErrorState().isIoAllowed()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); getAdapter().service(request, response); // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can't try and set headers. if(keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) { setErrorState(ErrorState.CLOSE_CLEAN, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (HeadersTooLargeException e) { log.error(sm.getString("http11processor.request.process"), e); // The response should not have been committed but check it // anyway to be safe if (response.isCommitted()) { setErrorState(ErrorState.CLOSE_NOW, e); } else { response.reset(); response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, e); response.setHeader("Connection", "close"); // TODO: Remove } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } } // Finish the handling of the request rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync()) { // If this is an async request then the request ends when it has // been completed. The AsyncContext is responsible for calling // endRequest() in that case. endRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // If there was an error, make sure the request is counted as // and error, and update the statistics counter if (getErrorState().isError()) { response.setStatus(500); } if (!isAsync() || getErrorState().isError()) { request.updateCounters(); if (getErrorState().isIoAllowed()) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } } if (!protocol.getDisableUploadTimeout()) { int connectionTimeout = protocol.getConnectionTimeout(); if(connectionTimeout > 0) { socketWrapper.setReadTimeout(connectionTimeout); } else { socketWrapper.setReadTimeout(0); } } rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); sendfileState = processSendfile(socketWrapper); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (getErrorState().isError() || protocol.isPaused()) { return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else if (isUpgrade()) { return SocketState.UPGRADING; } else { if (sendfileState == SendfileState.PENDING) { return SocketState.SENDFILE; } else { if (openSocket) { if (readComplete) { return SocketState.OPEN; } else { return SocketState.LONG; } } else { return SocketState.CLOSED; } } } }
service 方法的逻辑有点复杂,先执行一下初步工作,也就是inputBuffer.init(socketWrapper) 、 outputBuffer.init(socketWrapper) 等,然后就进入一个 while 循环。
在 while 循环里,是在 try-catch 语句块里执行
inputBuffer.parseRequestLine(...)
inputBuffer.parseHeaders()
以及一些状态和属性的设置。
Http11InputBuffer#parseRequestLine 是用来处理请求行的,Http11InputBuffer#parseHeaders 是用来处理请求头的。
然后在 在请求头里找 Connection 参数,看是否为为 upgrade,如果是则进入 HTTP 升级步骤。
然后就执行调用 prepareRequest() 方法来对请求就行初步处理,也就是针对请求头里的一些属性加入一些 InputFilter 到 Http11InputBuffer 里。比如解析请求头里的 host,transfer-encoding,content-length 等。
最后就调用 Adapter 的方法进行处理了,也就是
getAdapter().service(request, response);
getAdapter() 就是上面提到的 CoyoteAdapter。关于 CoyoteAdapter#service 方法,会在后面的文章里单独解析,这里就不多做描述了。
调用完这个方法后就是一些收尾工作了,service 方法比较长,逻辑也比较复杂,本文在此省略了很多不关键的地方。
小结
本文介绍了 Http11Processor 的 process 方法是怎么处理请求的,其中分为 dispatch 和 service 方法来分别对不同类型的 HTTP 请求做处理。而在 dispatch 和 service 方法里,关键的地方就是调用 Adapter 的相关方法。