前言
上篇文章中分析了 Poller 和 PollerEvent,Poller#processSocket 方法里获取了一个 SocketProcessorBase 来处理SocketChannel 的读写事件,在 SocketProcessorBase 的子类 SocketProcessor#doRun 方法里通过 getHandler().process(socketWrapper, event) 来处理,这个 getHandler() 获取的就是 ConnectionHandler 对象。
1. ConnectionHandler#process
ConnectionHandler 是 AbstractProtocol 里的内部类,其声明为
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S>
下面看它的 process 方法
@Override public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); } // Async timeouts are calculated on a dedicated thread and then // dispatched. Because of delays in the dispatch process, the // timeout may no longer be required. Check here and avoid // unnecessary processing. if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) { // This is effectively a NO-OP return SocketState.OPEN; } if (processor != null) { // Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } ContainerThreadMarker.set(); try { if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); if (negotiatedProtocol != null) { UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { // TODO: // OpenSSL 1.0.2's ALPN callback doesn't support // failing the handshake with an error if no // protocol can be negotiated. Therefore, we need to // fail the connection here. Once this is fixed, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); } return SocketState.CLOSED; /* * To replace the code above once OpenSSL 1.1.0 is * used. // Failed to create processor. This is a bug. throw new IllegalStateException(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); */ } } } if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); // Associate the processor with the connection connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler UpgradeToken upgradeToken = processor.getUpgradeToken(); // Retrieve leftover input ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { // Assume direct HTTP/2 connection UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", "h2c")); } return SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate", processor, wrapper)); } wrapper.unRead(leftOverInput); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } } } } while ( state == SocketState.UPGRADING); if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); } } else if (state == SocketState.SUSPENDED) { // Don't add sockets back to the poller. // The resumeProcessing() method will add this socket // to the poller. } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(processor); } } return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } catch (ProtocolException e) { // Protocol exceptions normally mean the client sent invalid or // incomplete data. getLog().debug(sm.getString( "abstractConnectionHandler.protocolexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); } // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return SocketState.CLOSED; }
ConnectionHandler#process 方法比较长,一点点分析
private final Map<S,Processor> connections = new ConcurrentHashMap<>(); S socket = wrapper.getSocket(); Processor processor = connections.get(socket);
首先从 connections 这个 Map 里取出一个 Processor 对象,如果取出的 Processor 不为null,就接下来调用
// Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor);
如果是第一次连接的话,connections.get(socket) 获取的就会为 null。
getProtocol() 这个返回的就是构造 ConnectionHandler 时传入的 ProtocolHandler 对象,也就是 Http11NioProtocol 对象。
public void removeWaitingProcessor(Processor processor) { waitingProcessors.remove(processor); }
removeWaitingProcessor 在 Http11NioProtocol 的父类的父类 AbstractProtocol 里,就是将取出的 processor 对象从 waitingProcessors 里移出。
接下来就是 try 语句块里的三个 if (processor == null) 了,这三个都是为了确保 processor 不为 null 的。
第一个 if (processor == null) 的逻辑是如果这个连接是一个 HTTPS 连接,就先获取 UpgradeProtocol 对象,获取到了之后再通过这个对象在获取一个 Processor 对象。HTTPS 的相关内容在此就不做详细讨论了。
第二个 if (processor == null) 里就是从 recycledProcessors 缓存池里获取一个。recycledProcessors 是 ConnectionHandler 里的属性,它的声明为
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
protected static class RecycledProcessors extends SynchronizedStack<Processor>
第三个 if (processor == null) 里就是创建一个 Processor 对象。getProtocol() 返回的是 Http11NioProtocol 对象,createProcessor 方法在 Http11NioProtocol 的父类的父类 AbstractHttp11Protocol 里声明。
@Override protected Processor createProcessor() { Http11Processor processor = new Http11Processor(this, adapter); return processor; }
createProcessor() 方法就是简单创建一个 Http11Processor 对象。传入的 this 是指 Http11NioProtocol 对象,而 adapter 是指 CoyoteAdapter 对象,这个 adapter 属性是在 Connector 的 initInternal 方法里创建完 CoyoteAdapter 对象后,调用 protocolHandler.setAdapter(adapter) 赋值的。
Http11Processor 是处理请求过程中的重要一环,后面会讲到,这里就不多做讨论。
获取到 Http11Processor 对象后,先设置了一下 sslSupport 属性,然后把这个对象放在 connections 里,然后就用这个对象来处理了。
也就是在 do-while 循环里。
SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); …… } while ( state == SocketState.UPGRADING);
先调用 Processor#process 方法来处理,把 ConnectionHandler#process 的形参都传入 Processor#process,然后返回一个 SocketState 对象
/** * Different types of socket states to react upon. */ public enum SocketState { // TODO Add a new state to the AsyncStateMachine and remove // ASYNC_END (if possible) OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, SUSPENDED }
SocketState 是 Handler<S> 里的内部枚举类,Handler 是 AbstractEndpoint 的内部接口。
然后,根据返回的 SocketState 的不同的值,分别作了处理。
首先 if (state == SocketState.LONG) 就执行longPoll(wrapper, processor)。
protected void longPoll(SocketWrapperBase<?> socket, Processor processor) { if (!processor.isAsync()) { // This is currently only used with HTTP // Either: // - this is an upgraded connection // - the request line/headers have not been completely // read socket.registerReadInterest(); } }
longPoll 也只是调用 socket.registerReadInterest() 方法,然后 socket.registerReadInterest() 在上篇文章里讲过了,这里就不多赘述了。
getProtocol().addWaitingProcessor(processor) 也只是将这个 processor 加入到上面提到的 waitingProcessors 里。
if (state == SocketState.OPEN) 里先将这个 <NioChannel, Processor> 对从 connections 里移除,并调用 release(processor) 方法释放资源或者回收 Processor 到 RecycledProcessors 里。
然后调用 wrapper.registerReadInterest(),跟 longPoll 的 if 语句里一样。
if (state == SocketState.SENDFILE) 和 if (state == SocketState.SUSPENDED) 一样,什么都不处理。
if (state == SocketState.UPGRADED) 里判断 ConnectionHandler#process 方法传入的参数是不是 SocketEvent.OPEN_WRITE,如果是就调用 longPoll 方法。
最后的 else 语句就是在处理 state == SocketState.CLOSED 的情况,并做一些清理操作。
最后返回这个 state。
小结
本文介绍了 ConnectionHandler#process 方法,其主要逻辑就是找一个 Processor 对象来处理读写事件。