前言
上篇文章中分析了 Http11Processor#process 方法是怎么处理请求的,其中关键的地方就是调用 CoyoteAdapter 的相关方法。
@Override public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res, SocketEvent status) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); if (request == null) { throw new IllegalStateException(sm.getString("coyoteAdapter.nullRequest")); } boolean success = true; AsyncContextImpl asyncConImpl = request.getAsyncContextInternal(); req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get()); try { if (!request.isAsync()) { // Error or timeout // Lift any suspension (e.g. if sendError() was used by an async // request) to allow the response to be written to the client response.setSuspended(false); } if (status==SocketEvent.TIMEOUT) { if (!asyncConImpl.timeout()) { asyncConImpl.setErrorState(null, false); } } else if (status==SocketEvent.ERROR) { // An I/O error occurred on a non-container thread which means // that the socket needs to be closed so set success to false to // trigger a close success = false; Throwable t = (Throwable)req.getAttribute(RequestDispatcher.ERROR_EXCEPTION); req.getAttributes().remove(RequestDispatcher.ERROR_EXCEPTION); ClassLoader oldCL = null; try { oldCL = request.getContext().bind(false, null); if (req.getReadListener() != null) { req.getReadListener().onError(t); } if (res.getWriteListener() != null) { res.getWriteListener().onError(t); } } finally { request.getContext().unbind(false, oldCL); } if (t != null) { asyncConImpl.setErrorState(t, true); } } // Check to see if non-blocking writes or reads are being used if (!request.isAsyncDispatching() && request.isAsync()) { WriteListener writeListener = res.getWriteListener(); ReadListener readListener = req.getReadListener(); if (writeListener != null && status == SocketEvent.OPEN_WRITE) { ClassLoader oldCL = null; try { oldCL = request.getContext().bind(false, null); res.onWritePossible(); if (request.isFinished() && req.sendAllDataReadEvent() && readListener != null) { readListener.onAllDataRead(); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); writeListener.onError(t); success = false; } finally { request.getContext().unbind(false, oldCL); } } else if (readListener != null && status == SocketEvent.OPEN_READ) { ClassLoader oldCL = null; try { oldCL = request.getContext().bind(false, null); // If data is being read on a non-container thread a // dispatch with status OPEN_READ will be used to get // execution back on a container thread for the // onAllDataRead() event. Therefore, make sure // onDataAvailable() is not called in this case. if (!request.isFinished()) { readListener.onDataAvailable(); } if (request.isFinished() && req.sendAllDataReadEvent()) { readListener.onAllDataRead(); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); readListener.onError(t); success = false; } finally { request.getContext().unbind(false, oldCL); } } } // Has an error occurred during async processing that needs to be // processed by the application's error page mechanism (or Tomcat's // if the application doesn't define one)? if (!request.isAsyncDispatching() && request.isAsync() && response.isErrorReportRequired()) { connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } if (request.isAsyncDispatching()) { connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION); if (t != null) { asyncConImpl.setErrorState(t, true); } } if (!request.isAsync()) { request.finishRequest(); response.finishResponse(); } // Check to see if the processor is in an error state. If it is, // bail out now. AtomicBoolean error = new AtomicBoolean(false); res.action(ActionCode.IS_ERROR, error); if (error.get()) { if (request.isAsyncCompleting()) { // Connection will be forcibly closed which will prevent // completion happening at the usual point. Need to trigger // call to onComplete() here. res.action(ActionCode.ASYNC_POST_PROCESS, null); } success = false; } } catch (IOException e) { success = false; // Ignore } catch (Throwable t) { ExceptionUtils.handleThrowable(t); success = false; log.error(sm.getString("coyoteAdapter.asyncDispatch"), t); } finally { if (!success) { res.setStatus(500); } // Access logging if (!success || !request.isAsync()) { long time = 0; if (req.getStartTime() != -1) { time = System.currentTimeMillis() - req.getStartTime(); } Context context = request.getContext(); if (context != null) { context.logAccess(request, response, time, false); } else { log(req, res, time); } } req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response if (!success || !request.isAsync()) { updateWrapperErrorCount(request, response); request.recycle(); response.recycle(); } } return success; }
在 Http11Processor 的父类 AbstractProcessor 的 dispatch 方法里调用 CoyoteAdapter#asyncDispatch 来异步处理请求。
注意传入的参数是 org.apache.coyote.Request 和 org.apache.coyote.Response 类型的对象
public static final int ADAPTER_NOTES = 1; Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES);
通过调用 org.apache.coyote.Request#getNote(ADAPTER_NOTES) 和 org.apache.coyote.Response#getNote(ADAPTER_NOTES) 来获取 org.apache.catalina.connector.Request 和 org.apache.catalina.connector.Response 对象,
public final Object getNote(int pos) { return notes[pos]; }
public final Object getNote(int pos) { return notes[pos]; }
这两个 getNote 方法都是取 notes 数组里指定 pos 的对象,也就是去 notes[1] 的这个对象,note[1] 的的值是在 CoyoteAdapter#service 里设值的。
asyncDispatch 在一个 try-catch 语句块里。try 里面是一些 if-else 的处理。
首先是 if (!request.isAsync()),这个 if 里的内容就一行,里面就是设值 org.apache.catalina.connector.Response 里的 OutputBuffer 对象的 suspend 属性。
然后是 if (status==SocketEvent.TIMEOUT) - else if (status==SocketEvent.ERROR)。这个 if- else if 里分别处理 status 为 SocketEvent.TIMEOUT 和 SocketEvent.ERROR 的情况,处理逻辑也很简单,就是调用一下监听器,设值一下标志属性或状态属性。调用 Context#bind 和 Context#unbind 方法。
接下来就是 if (!request.isAsyncDispatching() && request.isAsync())。这个 if 语句里是在 org.apache.coyote.Request 对象里的 ReadListener 类型的属性不为 null 并且数据读取已完成时,调用 ReadListener#onAllDataRead,其实就是调用一下监听器。
再接着就是 if (!request.isAsyncDispatching() && request.isAsync() && response.isErrorReportRequired()) 了 和 if (request.isAsyncDispatching()) 了,这两个 if 里的内容很相似,只是判断条件不一样,就是调用
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
这一行代码。这一行是 asyncDispatch 方法里最关键的步骤了,connector 是在 CoyoteAdapter 初始化时传入的
public CoyoteAdapter(Connector connector) { super(); this.connector = connector; }
CoyoteAdapter 的创建是在 Connector#initInternal 中。
connector.getService() 返回的是 Connector 关联的 Service 属性,也就是 StandardService 类型的对象
connector.getService().getContainer() 返回的是 Service 里的容器 Engine 属性,也就是 StandardEngine 对象,
connector.getService().getContainer().getPipeline() 返回的是 StandardEngine 里的 Pipeline 属性,也就是 StandardPipeline 对象。
connector.getService().getContainer().getPipeline().getFirst()
@Override public Valve getFirst() { if (first != null) { return first; } return basic; }
返回的是 StandardPipeline 的 Valve类型的数行 first 或者 basic。其中 StandardEngine 里的 StandardPipeline 的 basic 属性是 StandardEngineValve。
也就是这一行最终会调用 StandardEngineValve 里的 invoke 方法。关于 Valve#invoke 方法,后面的文章会介绍,这里就不多赘述了。
再接着就是
if (!request.isAsync()) { request.finishRequest(); response.finishResponse(); }
response.finishResponse(),是将数据返回给客户端。
2. CoyoteAdapter#service
上篇文章分析了 service 方法是用来处理标准 HTTP 请求的。
@Override public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); if (request == null) { // Create objects request = connector.createRequest(); request.setCoyoteRequest(req); response = connector.createResponse(); response.setCoyoteResponse(res); // Link objects request.setResponse(response); response.setRequest(request); // Set as notes req.setNote(ADAPTER_NOTES, request); res.setNote(ADAPTER_NOTES, response); // Set query string encoding req.getParameters().setQueryStringCharset(connector.getURICharset()); } if (connector.getXpoweredBy()) { response.addHeader("X-Powered-By", POWERED_BY); } boolean async = false; boolean postParseSuccess = false; req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get()); try { // Parse and set Catalina and configuration specific // request parameters postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported( connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } if (request.isAsync()) { async = true; ReadListener readListener = req.getReadListener(); if (readListener != null && request.isFinished()) { // Possible the all data may have been read during service() // method so this needs to be checked here ClassLoader oldCL = null; try { oldCL = request.getContext().bind(false, null); if (req.sendAllDataReadEvent()) { req.getReadListener().onAllDataRead(); } } finally { request.getContext().unbind(false, oldCL); } } Throwable throwable = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION); // If an async request was started, is not going to end once // this container thread finishes and an error occurred, trigger // the async error process if (!request.isAsyncCompleting() && throwable != null) { request.getAsyncContextInternal().setErrorState(throwable, true); } } else { request.finishRequest(); response.finishResponse(); } } catch (IOException e) { // Ignore } finally { AtomicBoolean error = new AtomicBoolean(false); res.action(ActionCode.IS_ERROR, error); if (request.isAsyncCompleting() && error.get()) { // Connection will be forcibly closed which will prevent // completion happening at the usual point. Need to trigger // call to onComplete() here. res.action(ActionCode.ASYNC_POST_PROCESS, null); async = false; } // Access log if (!async && postParseSuccess) { // Log only if processing was invoked. // If postParseRequest() failed, it has already logged it. Context context = request.getContext(); Host host = request.getHost(); // If the context is null, it is likely that the endpoint was // shutdown, this connection closed and the request recycled in // a different thread. That thread will have updated the access // log so it is OK not to update the access log here in that // case. // The other possibility is that an error occurred early in // processing and the request could not be mapped to a Context. // Log via the host or engine in that case. long time = System.currentTimeMillis() - req.getStartTime(); if (context != null) { context.logAccess(request, response, time, false); } else if (response.isError()) { if (host != null) { host.logAccess(request, response, time, false); } else { connector.getService().getContainer().logAccess( request, response, time, false); } } } req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response if (!async) { updateWrapperErrorCount(request, response); request.recycle(); response.recycle(); } } }
首先,跟 asyncDispatch 一样
Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES);
根据 org.apache.coyote.Request#getNote(ADAPTER_NOTES) 和 org.apache.coyote.Response#getNote(ADAPTER_NOTES) 来获取 org.apache.catalina.connector.Request 和 org.apache.catalina.connector.Response 对象,
如果获取不到 org.apache.catalina.connector.Request 对象,就创建 org.apache.catalina.connector.Request 和 org.apache.catalina.connector.Response 对象,并分别设置到 notes 数组的 notes[1] 元素里。
/** * Create (or allocate) and return a Request object suitable for * specifying the contents of a Request to the responsible Container. * * @return a new Servlet request object */ public Request createRequest() { return new Request(this); } /** * Create (or allocate) and return a Response object suitable for * receiving the contents of a Response from the responsible Container. * * @return a new Servlet response object */ public Response createResponse() { if (protocolHandler instanceof AbstractAjpProtocol<?>) { int packetSize = ((AbstractAjpProtocol<?>) protocolHandler).getPacketSize(); return new Response(packetSize - org.apache.coyote.ajp.Constants.SEND_HEAD_LEN); } else { return new Response(); } }
Connector 的 createRequest 和 createResponse 也很简单。
然后就进入 try-catch 语句块。
首先调用 postParseRequest 方法,这个方法是在 http 的 header 解析完之后,对 Request 和 Response 做一些设置的工作,里面包扩了uri参数解析,Host映射等重要步骤。
调用完 postParseRequest 后就进入关键代码 if (postParseRequest) 里,在这个 if 里,先调用一下 request.setAsyncSupported,然后调用
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
这一句跟 asyncDispatch 里的一样,也是 service 方法里的关键步骤,这里就不多讲了。
if (postParseRequest) 执行完之后,就执行 if (request.isAsync()),这个 if 里就是当条件满足的时候调用一下 ReadListener 的 onAllDataRead。
最后 finaly 的代码也比较简单,就不细讲了。
小结
本文分析了 CoyoteAdapter#asyncDispatch 和 CoyoteAdapter#service 方法,在这两个方法里最重要的是调用了
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
这一句把请求交给了 Valve,第一个处理请求的是 Engine 里的 Pipeline 的 Valve,也就是 StandardEngineValve。可以看出 CoyoteAdapter 如它的名字里的 Adapter 一样,是 Processor 和 Valve 之间的适配器。