转载

Tomcat源码解析系列(十七)CoyoteAdapter

前言

上篇文章中分析了 Http11Processor#process 方法是怎么处理请求的,其中关键的地方就是调用 CoyoteAdapter 的相关方法。

1. CoyoteAdapter#asyncDispatch

@Override
public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res,
        SocketEvent status) throws Exception {

    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {
        throw new IllegalStateException(sm.getString("coyoteAdapter.nullRequest"));
    }

    boolean success = true;
    AsyncContextImpl asyncConImpl = request.getAsyncContextInternal();

    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        if (!request.isAsync()) {
            // Error or timeout
            // Lift any suspension (e.g. if sendError() was used by an async
            // request) to allow the response to be written to the client
            response.setSuspended(false);
        }

        if (status==SocketEvent.TIMEOUT) {
            if (!asyncConImpl.timeout()) {
                asyncConImpl.setErrorState(null, false);
            }
        } else if (status==SocketEvent.ERROR) {
            // An I/O error occurred on a non-container thread which means
            // that the socket needs to be closed so set success to false to
            // trigger a close
            success = false;
            Throwable t = (Throwable)req.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
            req.getAttributes().remove(RequestDispatcher.ERROR_EXCEPTION);
            ClassLoader oldCL = null;
            try {
                oldCL = request.getContext().bind(false, null);
                if (req.getReadListener() != null) {
                    req.getReadListener().onError(t);
                }
                if (res.getWriteListener() != null) {
                    res.getWriteListener().onError(t);
                }
            } finally {
                request.getContext().unbind(false, oldCL);
            }
            if (t != null) {
                asyncConImpl.setErrorState(t, true);
            }
        }

        // Check to see if non-blocking writes or reads are being used
        if (!request.isAsyncDispatching() && request.isAsync()) {
            WriteListener writeListener = res.getWriteListener();
            ReadListener readListener = req.getReadListener();
            if (writeListener != null && status == SocketEvent.OPEN_WRITE) {
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    res.onWritePossible();
                    if (request.isFinished() && req.sendAllDataReadEvent() &&
                            readListener != null) {
                        readListener.onAllDataRead();
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    writeListener.onError(t);
                    success = false;
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            } else if (readListener != null && status == SocketEvent.OPEN_READ) {
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    // If data is being read on a non-container thread a
                    // dispatch with status OPEN_READ will be used to get
                    // execution back on a container thread for the
                    // onAllDataRead() event. Therefore, make sure
                    // onDataAvailable() is not called in this case.
                    if (!request.isFinished()) {
                        readListener.onDataAvailable();
                    }
                    if (request.isFinished() && req.sendAllDataReadEvent()) {
                        readListener.onAllDataRead();
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    readListener.onError(t);
                    success = false;
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }
        }

        // Has an error occurred during async processing that needs to be
        // processed by the application's error page mechanism (or Tomcat's
        // if the application doesn't define one)?
        if (!request.isAsyncDispatching() && request.isAsync() &&
                response.isErrorReportRequired()) {
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
        }

        if (request.isAsyncDispatching()) {
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
            Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
            if (t != null) {
                asyncConImpl.setErrorState(t, true);
            }
        }

        if (!request.isAsync()) {
            request.finishRequest();
            response.finishResponse();
        }

        // Check to see if the processor is in an error state. If it is,
        // bail out now.
        AtomicBoolean error = new AtomicBoolean(false);
        res.action(ActionCode.IS_ERROR, error);
        if (error.get()) {
            if (request.isAsyncCompleting()) {
                // Connection will be forcibly closed which will prevent
                // completion happening at the usual point. Need to trigger
                // call to onComplete() here.
                res.action(ActionCode.ASYNC_POST_PROCESS,  null);
            }
            success = false;
        }
    } catch (IOException e) {
        success = false;
        // Ignore
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        success = false;
        log.error(sm.getString("coyoteAdapter.asyncDispatch"), t);
    } finally {
        if (!success) {
            res.setStatus(500);
        }

        // Access logging
        if (!success || !request.isAsync()) {
            long time = 0;
            if (req.getStartTime() != -1) {
                time = System.currentTimeMillis() - req.getStartTime();
            }
            Context context = request.getContext();
            if (context != null) {
                context.logAccess(request, response, time, false);
            } else {
                log(req, res, time);
            }
        }

        req.getRequestProcessor().setWorkerThreadName(null);
        // Recycle the wrapper request and response
        if (!success || !request.isAsync()) {
            updateWrapperErrorCount(request, response);
            request.recycle();
            response.recycle();
        }
    }
    return success;
}

在 Http11Processor 的父类 AbstractProcessor 的 dispatch 方法里调用 CoyoteAdapter#asyncDispatch 来异步处理请求。

注意传入的参数是 org.apache.coyote.Request 和 org.apache.coyote.Response 类型的对象

public static final int ADAPTER_NOTES = 1;

Request request = (Request)  req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);

通过调用 org.apache.coyote.Request#getNote(ADAPTER_NOTES) 和 org.apache.coyote.Response#getNote(ADAPTER_NOTES) 来获取 org.apache.catalina.connector.Request 和 org.apache.catalina.connector.Response 对象,

public final Object getNote(int pos) {
    return notes[pos];
}
public final Object getNote(int pos) {
    return notes[pos];
}

这两个 getNote 方法都是取 notes 数组里指定 pos 的对象,也就是去 notes[1] 的这个对象,note[1] 的的值是在 CoyoteAdapter#service 里设值的。

asyncDispatch 在一个 try-catch 语句块里。try 里面是一些 if-else 的处理。

首先是 if (!request.isAsync()),这个 if 里的内容就一行,里面就是设值 org.apache.catalina.connector.Response 里的 OutputBuffer 对象的 suspend 属性。

然后是 if (status==SocketEvent.TIMEOUT) - else if (status==SocketEvent.ERROR)。这个 if- else if 里分别处理 status 为 SocketEvent.TIMEOUT 和 SocketEvent.ERROR 的情况,处理逻辑也很简单,就是调用一下监听器,设值一下标志属性或状态属性。调用 Context#bind 和 Context#unbind 方法。

接下来就是 if (!request.isAsyncDispatching() && request.isAsync())。这个 if 语句里是在 org.apache.coyote.Request 对象里的 ReadListener 类型的属性不为 null 并且数据读取已完成时,调用 ReadListener#onAllDataRead,其实就是调用一下监听器。

再接着就是 if (!request.isAsyncDispatching() && request.isAsync() && response.isErrorReportRequired()) 了 和 if (request.isAsyncDispatching()) 了,这两个 if 里的内容很相似,只是判断条件不一样,就是调用

connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

这一行代码。这一行是 asyncDispatch 方法里最关键的步骤了,connector 是在 CoyoteAdapter 初始化时传入的

public CoyoteAdapter(Connector connector) {
    super();
    this.connector = connector;

}

CoyoteAdapter 的创建是在 Connector#initInternal 中。

connector.getService() 返回的是 Connector 关联的 Service 属性,也就是 StandardService 类型的对象

connector.getService().getContainer() 返回的是 Service 里的容器 Engine 属性,也就是 StandardEngine 对象,

connector.getService().getContainer().getPipeline() 返回的是 StandardEngine 里的 Pipeline 属性,也就是 StandardPipeline 对象。

connector.getService().getContainer().getPipeline().getFirst()

@Override
public Valve getFirst() {
    if (first != null) {
        return first;
    }

    return basic;
}

返回的是 StandardPipeline 的 Valve类型的数行 first 或者 basic。其中 StandardEngine 里的 StandardPipeline 的 basic 属性是 StandardEngineValve。

也就是这一行最终会调用 StandardEngineValve 里的 invoke 方法。关于 Valve#invoke 方法,后面的文章会介绍,这里就不多赘述了。

再接着就是

if (!request.isAsync()) {
    request.finishRequest();
    response.finishResponse();
}

response.finishResponse(),是将数据返回给客户端。

2. CoyoteAdapter#service

上篇文章分析了 service 方法是用来处理标准 HTTP 请求的。

@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {

    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {
        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Set as notes
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);

        // Set query string encoding
        req.getParameters().setQueryStringCharset(connector.getURICharset());
    }

    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }

    boolean async = false;
    boolean postParseSuccess = false;

    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        // Parse and set Catalina and configuration specific
        // request parameters
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
            // Calling the container
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
        }
        if (request.isAsync()) {
            async = true;
            ReadListener readListener = req.getReadListener();
            if (readListener != null && request.isFinished()) {
                // Possible the all data may have been read during service()
                // method so this needs to be checked here
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    if (req.sendAllDataReadEvent()) {
                        req.getReadListener().onAllDataRead();
                    }
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }

            Throwable throwable =
                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

            // If an async request was started, is not going to end once
            // this container thread finishes and an error occurred, trigger
            // the async error process
            if (!request.isAsyncCompleting() && throwable != null) {
                request.getAsyncContextInternal().setErrorState(throwable, true);
            }
        } else {
            request.finishRequest();
            response.finishResponse();
        }

    } catch (IOException e) {
        // Ignore
    } finally {
        AtomicBoolean error = new AtomicBoolean(false);
        res.action(ActionCode.IS_ERROR, error);

        if (request.isAsyncCompleting() && error.get()) {
            // Connection will be forcibly closed which will prevent
            // completion happening at the usual point. Need to trigger
            // call to onComplete() here.
            res.action(ActionCode.ASYNC_POST_PROCESS,  null);
            async = false;
        }

        // Access log
        if (!async && postParseSuccess) {
            // Log only if processing was invoked.
            // If postParseRequest() failed, it has already logged it.
            Context context = request.getContext();
            Host host = request.getHost();
            // If the context is null, it is likely that the endpoint was
            // shutdown, this connection closed and the request recycled in
            // a different thread. That thread will have updated the access
            // log so it is OK not to update the access log here in that
            // case.
            // The other possibility is that an error occurred early in
            // processing and the request could not be mapped to a Context.
            // Log via the host or engine in that case.
            long time = System.currentTimeMillis() - req.getStartTime();
            if (context != null) {
                context.logAccess(request, response, time, false);
            } else if (response.isError()) {
                if (host != null) {
                    host.logAccess(request, response, time, false);
                } else {
                    connector.getService().getContainer().logAccess(
                            request, response, time, false);
                }
            }
        }

        req.getRequestProcessor().setWorkerThreadName(null);

        // Recycle the wrapper request and response
        if (!async) {
            updateWrapperErrorCount(request, response);
            request.recycle();
            response.recycle();
        }
    }
}

首先,跟 asyncDispatch 一样

Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);

根据 org.apache.coyote.Request#getNote(ADAPTER_NOTES) 和 org.apache.coyote.Response#getNote(ADAPTER_NOTES) 来获取 org.apache.catalina.connector.Request 和 org.apache.catalina.connector.Response 对象,

如果获取不到 org.apache.catalina.connector.Request 对象,就创建 org.apache.catalina.connector.Request 和 org.apache.catalina.connector.Response 对象,并分别设置到 notes 数组的 notes[1] 元素里。

/**
 * Create (or allocate) and return a Request object suitable for
 * specifying the contents of a Request to the responsible Container.
 *
 * @return a new Servlet request object
 */
public Request createRequest() {
    return new Request(this);
}


/**
 * Create (or allocate) and return a Response object suitable for
 * receiving the contents of a Response from the responsible Container.
 *
 * @return a new Servlet response object
 */
public Response createResponse() {
    if (protocolHandler instanceof AbstractAjpProtocol<?>) {
        int packetSize = ((AbstractAjpProtocol<?>) protocolHandler).getPacketSize();
        return new Response(packetSize - org.apache.coyote.ajp.Constants.SEND_HEAD_LEN);
    } else {
        return new Response();
    }
}

Connector 的 createRequest 和 createResponse 也很简单。

然后就进入 try-catch 语句块。

首先调用 postParseRequest 方法,这个方法是在 http 的 header 解析完之后,对 Request 和 Response 做一些设置的工作,里面包扩了uri参数解析,Host映射等重要步骤。

调用完 postParseRequest 后就进入关键代码 if (postParseRequest) 里,在这个 if 里,先调用一下 request.setAsyncSupported,然后调用

connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

这一句跟 asyncDispatch 里的一样,也是 service 方法里的关键步骤,这里就不多讲了。

if (postParseRequest) 执行完之后,就执行 if (request.isAsync()),这个 if 里就是当条件满足的时候调用一下 ReadListener 的 onAllDataRead。

最后 finaly 的代码也比较简单,就不细讲了。

小结

本文分析了 CoyoteAdapter#asyncDispatch 和 CoyoteAdapter#service 方法,在这两个方法里最重要的是调用了

connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

这一句把请求交给了 Valve,第一个处理请求的是 Engine 里的 Pipeline 的 Valve,也就是 StandardEngineValve。可以看出 CoyoteAdapter 如它的名字里的 Adapter 一样,是 Processor 和 Valve 之间的适配器。

原文  https://segmentfault.com/a/1190000022261740
正文到此结束
Loading...