也是因为之前自己的不谨慎,在写 Java编程方法论-Reactor与Webflux
的时候,因觉得tomcat关于connector部分已经有不错的博文了,草草参考了下,并没有对源码进行深入分析,导致自己在录制分享视频的时候,发现自己文章内容展现的和源码并不一致,又通过搜索引擎搜索了一些中文博客的文章,并不尽如人意,索性,自己的就通过最新的源码来重新梳理一下关于tomcat connector部分内容,也是给自己一个警醒,凡事务必仔细仔细再仔细!
参考源码地址: https://github.com/apache/tomcat
cd /java/tomcat/bin
执行tomcat 的 ./shutdown.sh
后,虽然tomcat服务不能正常访问了,但是 ps -ef | grep tomcat
后,发现 tomcat
对应的 java
进程未随web容器关闭而销毁,进而存在僵尸 java
进程。网上看了下导致僵尸进程的原因可能是有非守护线程(即User Thread)存在,jvm不会退出(当JVM中所有的线程都是守护线程的时候,JVM就可以退出了;如果还有一个或以上的非守护线程则JVM不会退出)。通过一下命令查看Tomcat进程是否结束:
ps -ef|grep tomcat
如果存在用户线程,给kill掉就好了即使用 kill -9 pid
我们接着从 startup.sh
这个shell脚本中可以发现,其最终调用了 catalina.sh start
,于是,我们找到 catalina.sh
里,在 elif [ "$1" = "start" ] ;
处,我们往下走,可以发现,其调用了 org.apache.catalina.startup.Bootstrap.java
这个类下的 start()
/** * org.apache.catalina.startup.Bootstrap * Start the Catalina daemon. * @throws Exception Fatal start error */ public void start() throws Exception { if( catalinaDaemon==null ) init(); Method method = catalinaDaemon.getClass().getMethod("start", (Class [] )null); method.invoke(catalinaDaemon, (Object [])null); }
这里,在服务器第一次启动的时候,会调用其 init()
,其主要用于创建 org.apache.catalina.startup.Catalina.java
/** * org.apache.catalina.startup.Bootstrap * Initialize daemon. * @throws Exception Fatal initialization error */ public void init() throws Exception { initClassLoaders(); Thread.currentThread().setContextClassLoader(catalinaLoader); SecurityClassLoad.securityClassLoad(catalinaLoader); // Load our startup class and call its process() method if (log.isDebugEnabled()) log.debug("Loading startup class"); Class<?> startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina"); Object startupInstance = startupClass.getConstructor().newInstance(); // Set the shared extensions class loader if (log.isDebugEnabled()) log.debug("Setting startup class properties"); String methodName = "setParentClassLoader"; Class<?> paramTypes[] = new Class[1]; paramTypes[0] = Class.forName("java.lang.ClassLoader"); Object paramValues[] = new Object[1]; paramValues[0] = sharedLoader; Method method = startupInstance.getClass().getMethod(methodName, paramTypes); method.invoke(startupInstance, paramValues); catalinaDaemon = startupInstance; }
/** * org.apache.catalina.startup.Catalina * Start a new server instance. */ public void start() { if (getServer() == null) { load(); } if (getServer() == null) { log.fatal(sm.getString("catalina.noServer")); return; } long t1 = System.nanoTime(); // Start the new server try { getServer().start(); } catch (LifecycleException e) { log.fatal(sm.getString("catalina.serverStartFail"), e); try { getServer().destroy(); } catch (LifecycleException e1) { log.debug("destroy() failed for failed Server ", e1); } return; } long t2 = System.nanoTime(); if(log.isInfoEnabled()) { log.info(sm.getString("catalina.startup", Long.valueOf((t2 - t1) / 1000000))); } // Register shutdown hook if (useShutdownHook) { if (shutdownHook == null) { shutdownHook = new CatalinaShutdownHook(); } Runtime.getRuntime().addShutdownHook(shutdownHook); // If JULI is being used, disable JULI's shutdown hook since // shutdown hooks run in parallel and log messages may be lost // if JULI's hook completes before the CatalinaShutdownHook() LogManager logManager = LogManager.getLogManager(); if (logManager instanceof ClassLoaderLogManager) { ((ClassLoaderLogManager) logManager).setUseShutdownHook( false); } } if (await) { await(); stop(); } }
在这里面,我们主要关心 load()
, getServer().start()
首先我们来看load(),这里,其会通过 createStartDigester()
创建并配置我们将用来启动的Digester,然后获取我们所配置的ServerXml文件,依次对里面属性进行配置,最后调用 getServer().init()
/** * org.apache.catalina.startup.Catalina * Start a new server instance. */ public void load() { if (loaded) { return; } loaded = true; long t1 = System.nanoTime(); initDirs(); // Before digester - it may be needed initNaming(); // Set configuration source ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(Bootstrap.getCatalinaBaseFile(), getConfigFile())); File file = configFile(); // Create and execute our Digester Digester digester = createStartDigester(); try (ConfigurationSource.Resource resource = ConfigFileLoader.getSource().getServerXml()) { InputStream inputStream = resource.getInputStream(); InputSource inputSource = new InputSource(resource.getURI().toURL().toString()); inputSource.setByteStream(inputStream); digester.push(this); digester.parse(inputSource); } catch (Exception e) { if (file == null) { log.warn(sm.getString("catalina.configFail", getConfigFile() + "] or [server-embed.xml"), e); } else { log.warn(sm.getString("catalina.configFail", file.getAbsolutePath()), e); if (file.exists() && !file.canRead()) { log.warn(sm.getString("catalina.incorrectPermissions")); } } return; } getServer().setCatalina(this); getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile()); getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile()); // Stream redirection initStreams(); // Start the new server try { getServer().init(); } catch (LifecycleException e) { if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) { throw new java.lang.Error(e); } else { log.error(sm.getString("catalina.initError"), e); } } long t2 = System.nanoTime(); if(log.isInfoEnabled()) { log.info(sm.getString("catalina.init", Long.valueOf((t2 - t1) / 1000000))); } }
这里,这个server从哪里来,我们从 digester.addObjectCreate("Server", "org.apache.catalina.core.StandardServer", "className");
中可以知道,其使用了这个类的实例,我们再回到 digester.push(this); digester.parse(inputSource);
这两句代码上来,可知,未开始解析时先调用Digester.push(this),此时栈顶元素是Catalina,这个用来为catalina设置server,这里,要对 digester
如解析到 <Server>
时就会创建 StandardServer
类的实例并反射调用 Digester
的 stack
栈顶对象的 setter
方法(调用的方法通过传入的 name
中涉及的 IntrospectionUtils.setProperty(top, name, value)
方法,即 top
为栈顶对象, name
为这个栈顶对象要设置的属性名, value
刚开始时栈顶元素是 Catalina
,即调用 Catalina.setServer(Server object)
方法设置 Server
为后面调用 Server.start()
做准备,然后将 StandardServer
对象实例放入 Digester
的 stack
接下来,我们来看 getServer().init()
,由上知,我们去找 org.apache.catalina.core.StandardServer.java
这个类,其继承 LifecycleMBeanBase
并实现了 Server
,通过 LifecycleMBeanBase
此类,说明这个 StandardServer
管理的生命周期,即通过 LifecycleMBeanBase
父类 LifecycleBase
实现的 init()
//org.apache.catalina.util.LifecycleBase.java @Override public final synchronized void init() throws LifecycleException { if (!state.equals(LifecycleState.NEW)) { invalidTransition(Lifecycle.BEFORE_INIT_EVENT); } try { setStateInternal(LifecycleState.INITIALIZING, null, false); initInternal(); setStateInternal(LifecycleState.INITIALIZED, null, false); } catch (Throwable t) { handleSubClassException(t, "lifecycleBase.initFail", toString()); } }
于是,我们关注 initInternal()
在 StandardServer
3、NamingResources初始化 : globalNamingResources.init();
4、从common ClassLoader开始往上查看,直到SystemClassLoader,遍历各个classLoader对应的查看路径,找到jar结尾的文件,读取Manifest信息,加入到ExtensionValidator#containerManifestResources属性中。
i) 调用super.initInternal()方法
ii) container初始化,这里container实例是StandardEngine。
iii) Executor初始化
a)org.apache.catalina.connector.Connector Connector[HTTP/1.1-8080]
b) org.apache.catalina.connector.Connector Connector[AJP/1.3-8009]
这里,我们可以看到 StandardServer
的父类 org.apache.catalina.util.LifecycleBase.java
@Override public final synchronized void start() throws LifecycleException { if (LifecycleState.STARTING_PREP.equals(state) || LifecycleState.STARTING.equals(state) || LifecycleState.STARTED.equals(state)) { if (log.isDebugEnabled()) { Exception e = new LifecycleException(); log.debug(sm.getString("lifecycleBase.alreadyStarted", toString()), e); } else if (log.isInfoEnabled()) { log.info(sm.getString("lifecycleBase.alreadyStarted", toString())); } return; } if (state.equals(LifecycleState.NEW)) { init(); } else if (state.equals(LifecycleState.FAILED)) { stop(); } else if (!state.equals(LifecycleState.INITIALIZED) && !state.equals(LifecycleState.STOPPED)) { invalidTransition(Lifecycle.BEFORE_START_EVENT); } try { setStateInternal(LifecycleState.STARTING_PREP, null, false); startInternal(); if (state.equals(LifecycleState.FAILED)) { // This is a 'controlled' failure. The component put itself into the // FAILED state so call stop() to complete the clean-up. stop(); } else if (!state.equals(LifecycleState.STARTING)) { // Shouldn't be necessary but acts as a check that sub-classes are // doing what they are supposed to. invalidTransition(Lifecycle.AFTER_START_EVENT); } else { setStateInternal(LifecycleState.STARTED, null, false); } } catch (Throwable t) { // This is an 'uncontrolled' failure so put the component into the // FAILED state and throw an exception. handleSubClassException(t, "lifecycleBase.startFail", toString()); } }
对于 StandardServer
,我们关注的是其对于 startInternal();
i) 设置状态为STARTING
ii) container启动,即StandardEngine启动
iii) Executor 启动
iv) Connector启动:
a)org.apache.catalina.connector.Connector Connector[HTTP/1.1-8080]
b) org.apache.catalina.connector.Connector Connector[AJP/1.3-8009]
终于,我们探究到了我要讲的主角 Connector
我们由 apache-tomcat-9.0.14/conf
目录(此处请自行下载相应版本的tomcat)下的server.xml中的 Connector
配置可知,其默认8080端口的配置协议为 HTTP/1.1
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" /> <!-- Define an AJP 1.3 Connector on port 8009 --> <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />
public Connector() { this("org.apache.coyote.http11.Http11NioProtocol"); } public Connector(String protocol) { boolean aprConnector = AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseAprConnector(); if ("HTTP/1.1".equals(protocol) || protocol == null) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol"; } else { protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol"; } } else if ("AJP/1.3".equals(protocol)) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol"; } else { protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol"; } } else { protocolHandlerClassName = protocol; } // Instantiate protocol handler ProtocolHandler p = null; try { Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"), e); } finally { this.protocolHandler = p; } // Default for Connector depends on this system property setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")); }
对于tomcat8.5以上,其默认就是 Http11NioProtocol
协议,这里,我们给其设定了 HTTP/1.1
,但根据上面的if语句的判断,是相等的,也就是最后还是选择的 Http11NioProtocol
同样,由上一节可知,我们会涉及到Connector初始化,也就是其也会继承 LifecycleMBeanBase
,那么,我们来看其相关 initInternal()
@Override protected void initInternal() throws LifecycleException { super.initInternal(); if (protocolHandler == null) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerInstantiationFailed")); } // Initialize adapter adapter = new CoyoteAdapter(this); protocolHandler.setAdapter(adapter); if (service != null) { protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor()); } // Make sure parseBodyMethodsSet has a default if (null == parseBodyMethodsSet) { setParseBodyMethods(getParseBodyMethods()); } if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) { throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr", getProtocolHandlerClassName())); } if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() && protocolHandler instanceof AbstractHttp11JsseProtocol) { AbstractHttp11JsseProtocol<?> jsseProtocolHandler = (AbstractHttp11JsseProtocol<?>) protocolHandler; if (jsseProtocolHandler.isSSLEnabled() && jsseProtocolHandler.getSslImplementationName() == null) { // OpenSSL is compatible with the JSSE configuration, so use it if APR is available jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName()); } } try { protocolHandler.init(); } catch (Exception e) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e); } }
3、protocolHandler 初始化(org.apache.coyote.http11.Http11Protocol)
关于启动就不说了,其设定本对象状态为STARTING,同时调用 protocolHandler.start();
@Override protected void startInternal() throws LifecycleException { // Validate settings before starting if (getPortWithOffset() < 0) { throw new LifecycleException(sm.getString( "coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset()))); } setState(LifecycleState.STARTING); try { protocolHandler.start(); } catch (Exception e) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerStartFailed"), e); } }
这里,我们直接从其抽象实现 org.apache.coyote.AbstractProtocol.java
来看,其也是遵循生命周期的,所以其也要继承 LifecycleMBeanBase
并实现自己的 init()
与 start()
等生命周期方法,其内部都是由相应的自实现的 endpoint
//org.apache.coyote.AbstractProtocol.java @Override public void init() throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.init", getName())); logPortOffset(); } if (oname == null) { // Component not pre-registered so register it oname = createObjectName(); if (oname != null) { Registry.getRegistry(null, null).registerComponent(this, oname, null); } } if (this.domain != null) { rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName()); Registry.getRegistry(null, null).registerComponent( getHandler().getGlobal(), rgOname, null); } String endpointName = getName(); endpoint.setName(endpointName.substring(1, endpointName.length()-1)); endpoint.setDomain(domain); endpoint.init(); } @Override public void start() throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.start", getName())); logPortOffset(); } endpoint.start(); monitorFuture = getUtilityExecutor().scheduleWithFixedDelay( new Runnable() { @Override public void run() { if (!isPaused()) { startAsyncTimeout(); } } }, 0, 60, TimeUnit.SECONDS); }
拿 org.apache.coyote.http11.Http11AprProtocol
这个类来讲,其接收的是 NioEndpoint
来进行构造器的实现,其内部的方法的具体实现也经由此 NioEndpoint
public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> { private static final Log log = LogFactory.getLog(Http11NioProtocol.class); public Http11NioProtocol() { super(new NioEndpoint()); } @Override protected Log getLog() { return log; } // -------------------- Pool setup -------------------- public void setPollerThreadCount(int count) { ((NioEndpoint)getEndpoint()).setPollerThreadCount(count); } public int getPollerThreadCount() { return ((NioEndpoint)getEndpoint()).getPollerThreadCount(); } public void setSelectorTimeout(long timeout) { ((NioEndpoint)getEndpoint()).setSelectorTimeout(timeout); } public long getSelectorTimeout() { return ((NioEndpoint)getEndpoint()).getSelectorTimeout(); } public void setPollerThreadPriority(int threadPriority) { ((NioEndpoint)getEndpoint()).setPollerThreadPriority(threadPriority); } public int getPollerThreadPriority() { return ((NioEndpoint)getEndpoint()).getPollerThreadPriority(); } // ----------------------------------------------------- JMX related methods @Override protected String getNamePrefix() { if (isSSLEnabled()) { return "https-" + getSslImplementationShortName()+ "-nio"; } else { return "http-nio"; } } }
这里, EndPoint
用于处理具体连接和传输数据,即用来实现网络连接和控制,它是服务器对外 I/O
操作的接入点。主要任务是管理对外的 socket
连接,同时将建立好的 socket
里面两个主要的属性类是 Acceptor
和 Poller
、 SocketProcessor
我们以 NioEndpoint
结合上一节最后,我们主要还是关注其对于 Protocol
//org.apache.tomcat.util.net.AbstractEndpoint.java public final void init() throws Exception { if (bindOnInit) { bindWithCleanup(); bindState = BindState.BOUND_ON_INIT; } if (this.domain != null) { // Register endpoint (as ThreadPool - historical name) oname = new ObjectName(domain + ":type=ThreadPool,name=/"" + getName() + "/""); Registry.getRegistry(null, null).registerComponent(this, oname, null); ObjectName socketPropertiesOname = new ObjectName(domain + ":type=ThreadPool,name=/"" + getName() + "/",subType=SocketProperties"); socketProperties.setObjectName(socketPropertiesOname); Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null); for (SSLHostConfig sslHostConfig : findSslHostConfigs()) { registerJmx(sslHostConfig); } } } public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bindWithCleanup(); bindState = BindState.BOUND_ON_START; } startInternal(); } //org.apache.tomcat.util.net.AbstractEndpoint.java private void bindWithCleanup() throws Exception { try { bind(); } catch (Throwable t) { // Ensure open sockets etc. are cleaned up if something goes // wrong during bind ExceptionUtils.handleThrowable(t); unbind(); throw t; } }
这两个方法主要调用 bind
(此处可以查阅 bindWithCleanup()
的具体实现) 和 startlntemal
方法,它们是模板方法,可以自行根据需求实现,这里,我们参考 NioEndpoint
中的实现, bind
//org.apache.tomcat.util.net.NioEndpoint.java @Override public void bind() throws Exception { initServerSocket(); // Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; } setStopLatch(new CountDownLatch(pollerThreadCount)); // Initialize SSL if needed initialiseSsl(); selectorPool.open(); }
这里的bind 方法中首先初始化了 ServerSocket
(这个东西我们在jdk网络编程里都接触过,就不多说了,这里是封装了一个工具类,看下面实现),然后检查了代表 Acceptor
和 Poller
初始化的线程数量的 acceptorThreadCount
属性和 pollerThreadCount
// Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } serverSock.configureBlocking(true); //mimic APR behavior }
这里, Acceptor
用于接收请求,将接收到请求交给 Poller
处理,它们都是启动线程来处理的。另外还进行了初始化 SSL
等内容。 NioEndpoint
的 startInternal
/** * The socket pollers. */ private Poller[] pollers = null; /** * Start the NIO endpoint, creating acceptor, poller threads. */ @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); } }
这里首先初始化了一些属性,初始化的属性中的 processorCache
是 SynchronizedStack<SocketProcessor>
类型, SocketProcessor
是 NioEndpoint
的一个内部类, Poller
接收到请求后就会交给它处理, SocketProcessor
又会将请求传递到 Handler
然后启动了 Poller
和 Acceptor
来处理请求,这里我们要注意的的是, pollers
是一个数组,其管理了一堆 Runnable
这里,我们想要对其进行配置的话,可以在 server.xml
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" maxHeaderCount="64" maxParameterCount="64" maxHttpHeaderSize="8192" URIEncoding="UTF-8" useBodyEncodingForURI="false" maxThreads="128" minSpareThreads="12" acceptCount="1024" connectionLinger="-1" keepAliveTimeout="60" maxKeepAliveRequests="32" maxConnections="10000" acceptorThreadCount="1" pollerThreadCount="2" selectorTimeout="1000" useSendfile="true" selectorPool.maxSelectors="128" redirectPort="8443" />
启动 Acceptor
的 startAcceptorThreads
方法在 AbstractEndpoint
protected void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
这里的 getAcceptorThreadCount
方法就是获取的init 方法中处理过的acceptorThreadCount属性,获取到后就会启动相应数量的Acceptor 线程来接收请求。默认同样是1,其创建线程的方式和Poller一致,就不多说了。
<attribute name="acceptorThreadCount" required="false"> <p>The number of threads to be used to accept connections. Increase this value on a multi CPU machine, although you would never really need more than <code>2</code>. Also, with a lot of non keep alive connections, you might want to increase this value as well. Default value is <code>1</code>.</p> </attribute> <attribute name="pollerThreadCount" required="false"> <p>(int)The number of threads to be used to run for the polling events. Default value is <code>1</code> per processor but not more than 2.<br/> When accepting a socket, the operating system holds a global lock. So the benefit of going above 2 threads diminishes rapidly. Having more than one thread is for system that need to accept connections very rapidly. However usually just increasing <code>acceptCount</code> will solve that problem. Increasing this value may also be beneficial when a large amount of send file operations are going on. </p> </attribute>
由此可知, acceptorThreadCount
用于设定接受连接的线程数。 在多CPU机器上增加这个值,虽然你可能真的不需要超过2个。哪怕有很多非keep alive连接,你也可能想要增加这个值。 其默认值为1。
用于为轮询事件运行的线程数。默认值为每个处理器1个但不要超过2个(上面的优化配置里的设定为2)。接受socket时,操作系统将保持全局锁定。 因此,超过2个线程的好处迅速减少。 当系统拥有多个该类型线程,它可以非常快速地接受连接。 所以增加acceptCount就可以解决这个问题。当正在进行大量发送文件操作时,增加此值也可能是有益的。
//org.apache.tomcat.util.net.NioEndpoint.java @Override protected SocketChannel serverSocketAccept() throws Exception { return serverSock.accept(); } //org.apache.tomcat.util.net.Acceptor.java public class Acceptor<U> implements Runnable { private static final Log log = LogFactory.getLog(Acceptor.class); private static final StringManager sm = StringManager.getManager(Acceptor.class); private static final int INITIAL_ERROR_DELAY = 50; private static final int MAX_ERROR_DELAY = 1600; private final AbstractEndpoint<?,U> endpoint; private String threadName; protected volatile AcceptorState state = AcceptorState.NEW; public Acceptor(AbstractEndpoint<?,U> endpoint) { this.endpoint = endpoint; } public final AcceptorState getState() { return state; } final void setThreadName(final String threadName) { this.threadName = threadName; } final String getThreadName() { return threadName; } @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (endpoint.isRunning()) { // Loop if endpoint is paused while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!endpoint.isRunning()) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; } U socket = null; try { // Accept the next incoming connection from the server // socket // 创建一个socketChannel,接收下一个从服务器进来的连接 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket // 如果EndPoint处于running状态并且没有没暂停 if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail"); // APR specific. // Could push this down but not sure it is worth the trouble. if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233) { // Not an error on HP-UX so log as a warning // so it can be filtered out on that platform // See bug 50273 log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } state = AcceptorState.ENDED; } ... public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED } }
由上面run方法可以看到, Acceptor
使用 serverSock.accept()
阻塞的监听端口,如果有连接进来,拿到了 socket
,并且 EndPoint
处于正常运行状态,则调用 NioEndPoint
的 setSocketOptions
方法,对于 setSocketOptions
,概括来讲就是根据 socket
构建一个 NioChannel
,然后把这个的 NioChannel
注册到 Poller
的事件列表里面,等待 poller
/** * org.apache.tomcat.util.net.NioEndpoint.java * Process the specified connection. * 处理指定的连接 * @param socket The socket channel * @return <code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately * 如果socket配置正确,并且可能会继续处理,返回true * 如果socket需要立即关闭,则返回false */ @Override protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); //从缓存中拿一个nioChannel 若没有,则创建一个。将socket传进去 NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } //从pollers数组中获取一个Poller对象,注册这个nioChannel getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; } /** * Return an available poller in true round robin fashion. * * @return The next poller in sequence */ public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
关于 getPoller0()
,默认情况下, 由前面可知,这个pollers数组里只有一个元素,这点要注意。我们来看NioEndPoint中的Poller实现的register方法,主要做的就是在Poller注册新创建的套接字。
/** * Registers a newly created socket with the poller. * * @param socket The newly created socket */ public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); //从缓存中取出一个PollerEvent对象,若没有则创建一个。将socket和NioSocketWrapper设置进去 PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); //添到到该Poller的事件列表 addEvent(r); }
/** * Poller class. */ public class Poller implements Runnable { private Selector selector; private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>(); private volatile boolean close = false; private long nextExpiration = 0;//optimize expiration handling private AtomicLong wakeupCounter = new AtomicLong(0); private volatile int keyCount = 0; public Poller() throws IOException { this.selector = Selector.open(); } public int getKeyCount() { return keyCount; } public Selector getSelector() { return selector;} /** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */ @Override public void run() { // Loop until destroy() is called // 循环直到 destroy() 被调用 while (true) { boolean hasEvents = false; try { if (!close) { //遍历events,将每个事件中的Channel的interestOps注册到Selector中 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select //如果走到了这里,代表已经有就绪的IO Channel //调用非阻塞的select方法,直接返回就绪Channel的数量 keyCount = selector.selectNow(); } else { //阻塞等待操作系统返回 数据已经就绪的Channel,然后被唤醒 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } //either we timed out or we woke up, process events first //如果上面select方法超时,或者被唤醒,先将events队列中的Channel注册到Selector上。 if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. // 遍历已就绪的Channel,并调用processKey来处理该Socket的IO。 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() // 如果其它线程已调用,则Attachment可能为空 if (attachment == null) { iterator.remove(); } else { iterator.remove(); //创建一个SocketProcessor,放入Tomcat线程池去执行 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); } ... }
上面读取已就绪Channel的部分,是十分常见的Java NIO的用法,即 Selector调用selectedKeys(),获取IO数据已经就绪的Channel,遍历并调用processKey方法来处理每一个Channel就绪的事件。而processKey方法会创建一个SocketProcessor,然后丢到Tomcat线程池中去执行。
/** * Processes events in the event queue of the Poller. * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty */ public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { //把SocketChannel的interestOps注册到Selector中 pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error(sm.getString("endpoint.nio.pollerEventError"), x); } } return result; }
/** * PollerEvent, cacheable object for poller events to avoid GC */ public static class PollerEvent implements Runnable { private NioChannel socket; private int interestOps; private NioSocketWrapper socketWrapper; public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) { reset(ch, w, intOps); } public void reset(NioChannel ch, NioSocketWrapper w, int intOps) { socket = ch; interestOps = intOps; socketWrapper = w; } public void reset() { reset(null, null, 0); } @Override public void run() { //Acceptor调用Poller.register()方法时,创建的PollerEvent的interestOps为OP_REGISTER,因此走这个分支 if (interestOps == OP_REGISTER) { try { socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. socket.socketWrapper.getEndpoint().countDownConnection(); ((NioSocketWrapper) socket.socketWrapper).closed = true; } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); } else { socket.getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key); } catch (Exception ignore) {} } } } @Override public String toString() { return "Poller event: socket [" + socket + "], socketWrapper [" + socketWrapper + "], interestOps [" + interestOps + "]"; } }
即Poller的run方法中最后调用的 processKey(sk, attachment);
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.nio.keyProcessingError"), t); } }
即从 processSocket
这个方法中会用到 SocketProcessor
/** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
这里简单提一下 SocketProcessor
由图中可以看到, SocketProcessor
中通过 Http11ConnectionHandler
,拿到 Htpp11Processor
,然后 Htpp11Processor
会调用 prepareRequest
方法来准备好请求数据。接着调用 CoyoteAdapter
的 service
方法进行 request
和 response
的适配,之后交给 Tomcat
这里首先从Connector 中获取到Service ( Connector 在initInternal 方法中创建CoyoteAdapter的时候已经将自己设置到了CoyoteAdapter 中),然后从Service 中获取Container ,接着获取管道,再获取管道的第一个Value,最后调用invoke 方法执行请求。Service 中保存的是最顶层的容器,当调用最顶层容器管道的invoke 方法时,管道将逐层调用各层容器的管道中Value 的invoke 方法,直到最后调用Wrapper 的管道中的BaseValue ( StandardWrapperValve)来处理Filter 和Servlet。