tomcat 作为知名的web容器,很棒! 本文简单了从其应用命令开始拆解,让我们对他有清晰的了解,揭开神秘的面纱!(冗长的代码流水线,给你一目了然)
话分两头:
1. tomcat是如何启动的?
2. tomcat是如何接收请求的?
x. 应用程序是怎样接入tomcat的?
从何处开始?
/etc/init.d/tomcat8 start # 简单的 tomcat8 脚本封装 . "$_" /usr/java/jdk1.8.0_101/bin/java org.apache.catalina.startup.Bootstrap "$@" start # 参考eval命令 eval "/"$_RUNJAVA/"" "/"$LOGGING_CONFIG/"" $LOGGING_MANAGER $JAVA_OPTS $CATALINA_OPTS / -Djava.endorsed.dirs="/"$JAVA_ENDORSED_DIRS/"" -classpath "/"$CLASSPATH/"" / -Dcatalina.base="/"$CATALINA_BASE/"" / -Dcatalina.home="/"$CATALINA_HOME/"" / -Djava.io.tmpdir="/"$CATALINA_TMPDIR/"" / org.apache.catalina.startup.Bootstrap "$@" start "2>&1" | /usr/local/sbin/cronolog -S "$CATALINA_BASE"/logs/catalina_ln.out "$CATALINA_BASE"/logs/catalina.%Y-%m-%d-%H.out >> /dev/null & # 运行后得到结果 /usr/java/jdk1.8.0_101/bin/java -server -Xmx6144M -Xms1024M -Dfile.encoding=UTF-8 -Xloggc:/opt/tomcat7/logs/tomcat_app_gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/tomcat7/logs/ -classpath /opt/tomcat8/bin/bootstrap.jar:/opt/tomcat8/bin/tomcat-juli.jar -Dcatalina.base="/opt/tomcat8" -Dcatalina.home="/opt/tomcat8" org.apache.catalina.startup.Bootstrap "$@" start "2>&1"
可以看到,tomcat最终是执行 org.apache.catalina.startup.Bootstrap 的 main() 方法,参数为 start ...
那么,我们就以此为起点,进行简单查看下 tomcat 的一些运行原理吧!
// org.apache.catalina.startup.Bootstrap main()入口,启动失败时,直接调用 System.exit(1); 退出jvm.
public static void main(String args[]) { // 开始执行,先进行各种初始化操作,然后再解析命令,进行相应方法调用 if (daemon == null) { // Don't set daemon until init() has completed Bootstrap bootstrap = new Bootstrap(); try { bootstrap.init(); } catch (Throwable t) { handleThrowable(t); t.printStackTrace(); return; } daemon = bootstrap; } else { // When running as a service the call to stop will be on a new // thread so make sure the correct class loader is used to prevent // a range of class not found exceptions. Thread.currentThread().setContextClassLoader(daemon.catalinaLoader); } try { // 解析命令行,以最后一个参数作为启动或停止指令,默认为启动操作,但是外部的tomcat脚本可以防止不传入该参数 String command = "start"; if (args.length > 0) { command = args[args.length - 1]; } if (command.equals("startd")) { args[args.length - 1] = "start"; daemon.load(args); daemon.start(); } else if (command.equals("stopd")) { args[args.length - 1] = "stop"; daemon.stop(); } else if (command.equals("start")) { // 重点以start为例 daemon.setAwait(true); daemon.load(args); daemon.start(); if (null == daemon.getServer()) { System.exit(1); } } else if (command.equals("stop")) { daemon.stopServer(args); } else if (command.equals("configtest")) { daemon.load(args); if (null == daemon.getServer()) { System.exit(1); } System.exit(0); } else { log.warn("Bootstrap: command /"" + command + "/" does not exist."); } } catch (Throwable t) { // Unwrap the Exception for clearer error reporting if (t instanceof InvocationTargetException && t.getCause() != null) { t = t.getCause(); } // 先处理特殊异常 handleThrowable(t); t.printStackTrace(); System.exit(1); } }
// 接下来我们先看看初始化过程: bootstrap.init(); 也就是自身的 init() 方法。
/** * Initialize daemon. * @throws Exception Fatal initialization error */ public void init() throws Exception { // 设置 classLoader, 因该classLoader可由外部传入,所以不能直接加载类,如: -c initClassLoaders(); // 设置classLoader到当前线程,以备后续调用 Thread.currentThread().setContextClassLoader(catalinaLoader); // 加载关键的各种系统类,使用自定义的classLoader SecurityClassLoad.securityClassLoad(catalinaLoader); // 由于加载默认应用时报错,找不到 JSP Compiler, 因此加上此句主动加载, 临时解决问题 catalinaLoader.loadClass(、"org.apache.jasper.servlet.JasperInitializer"); new JasperInitializer(); // Load our startup class and call its process() method if (log.isDebugEnabled()) log.debug("Loading startup class"); Class<?> startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina"); Object startupInstance = startupClass.getConstructor().newInstance(); // Set the shared extensions class loader if (log.isDebugEnabled()) log.debug("Setting startup class properties"); String methodName = "setParentClassLoader"; Class<?> paramTypes[] = new Class[1]; paramTypes[0] = Class.forName("java.lang.ClassLoader"); Object paramValues[] = new Object[1]; paramValues[0] = sharedLoader; Method method = startupInstance.getClass().getMethod(methodName, paramTypes); method.invoke(startupInstance, paramValues); catalinaDaemon = startupInstance; } // step1. 加载 classLoader private void initClassLoaders() { try { // 创建 common commonLoader = createClassLoader("common", null); if( commonLoader == null ) { // no config file, default to this loader - we might be in a 'single' env. commonLoader=this.getClass().getClassLoader(); } catalinaLoader = createClassLoader("server", commonLoader); sharedLoader = createClassLoader("shared", commonLoader); } catch (Throwable t) { handleThrowable(t); log.error("Class loader creation threw exception", t); System.exit(1); } } // private ClassLoader createClassLoader(String name, ClassLoader parent) throws Exception { // "${catalina.base}/lib","${catalina.base}/lib/*.jar","${catalina.home}/lib","${catalina.home}/lib/*.jar" String value = CatalinaProperties.getProperty(name + ".loader"); if ((value == null) || (value.equals(""))) return parent; // 将 ${xx} 替换为具体的路径,转换为绝对路径 value = replace(value); List<Repository> repositories = new ArrayList<>(); // 解析每个路径,然后依次加载 String[] repositoryPaths = getPaths(value); for (String repository : repositoryPaths) { // Check for a JAR URL repository try { @SuppressWarnings("unused") URL url = new URL(repository); repositories.add( new Repository(repository, RepositoryType.URL)); continue; } catch (MalformedURLException e) { // Ignore } // Local repository if (repository.endsWith("*.jar")) { repository = repository.substring (0, repository.length() - "*.jar".length()); repositories.add( new Repository(repository, RepositoryType.GLOB)); } else if (repository.endsWith(".jar")) { repositories.add( new Repository(repository, RepositoryType.JAR)); } else { repositories.add( new Repository(repository, RepositoryType.DIR)); } } // 将上面加载的目录文件,进行加载到内存 return ClassLoaderFactory.createClassLoader(repositories, parent); }
// org.apache.catalina.startup.ClassLoaderFactory 工厂类
/** * Create and return a new class loader, based on the configuration * defaults and the specified directory paths: * * @param repositories List of class directories, jar files, jar directories * or URLS that should be added to the repositories of * the class loader. * @param parent Parent class loader for the new class loader, or * <code>null</code> for the system class loader. * @return the new class loader * * @exception Exception if an error occurs constructing the class loader */ public static ClassLoader createClassLoader(List<Repository> repositories, final ClassLoader parent) throws Exception { if (log.isDebugEnabled()) log.debug("Creating new class loader"); // Construct the "class path" for this class loader Set<URL> set = new LinkedHashSet<>(); if (repositories != null) { for (Repository repository : repositories) { if (repository.getType() == RepositoryType.URL) { URL url = buildClassLoaderUrl(repository.getLocation()); if (log.isDebugEnabled()) log.debug(" Including URL " + url); set.add(url); } else if (repository.getType() == RepositoryType.DIR) { File directory = new File(repository.getLocation()); directory = directory.getCanonicalFile(); if (!validateFile(directory, RepositoryType.DIR)) { continue; } URL url = buildClassLoaderUrl(directory); if (log.isDebugEnabled()) log.debug(" Including directory " + url); set.add(url); } else if (repository.getType() == RepositoryType.JAR) { File file=new File(repository.getLocation()); file = file.getCanonicalFile(); if (!validateFile(file, RepositoryType.JAR)) { continue; } URL url = buildClassLoaderUrl(file); if (log.isDebugEnabled()) log.debug(" Including jar file " + url); set.add(url); } else if (repository.getType() == RepositoryType.GLOB) { File directory=new File(repository.getLocation()); directory = directory.getCanonicalFile(); if (!validateFile(directory, RepositoryType.GLOB)) { continue; } if (log.isDebugEnabled()) log.debug(" Including directory glob " + directory.getAbsolutePath()); String filenames[] = directory.list(); if (filenames == null) { continue; } for (int j = 0; j < filenames.length; j++) { String filename = filenames[j].toLowerCase(Locale.ENGLISH); if (!filename.endsWith(".jar")) continue; File file = new File(directory, filenames[j]); file = file.getCanonicalFile(); if (!validateFile(file, RepositoryType.JAR)) { continue; } if (log.isDebugEnabled()) log.debug(" Including glob jar file " + file.getAbsolutePath()); URL url = buildClassLoaderUrl(file); set.add(url); } } } } // Construct the class loader itself final URL[] array = set.toArray(new URL[set.size()]); if (log.isDebugEnabled()) for (int i = 0; i < array.length; i++) { log.debug(" location " + i + " is " + array[i]); } return AccessController.doPrivileged( new PrivilegedAction<URLClassLoader>() { @Override public URLClassLoader run() { if (parent == null) return new URLClassLoader(array); else return new URLClassLoader(array, parent); } }); }
// org.apache.catalina.security.SecurityClassLoad 加载系统关键类
public final class SecurityClassLoad { public static void securityClassLoad(ClassLoader loader) throws Exception { securityClassLoad(loader, true); } static void securityClassLoad(ClassLoader loader, boolean requireSecurityManager) throws Exception { if (requireSecurityManager && System.getSecurityManager() == null) { return; } // 清晰地加载各种系统类库 loadCorePackage(loader); loadCoyotePackage(loader); loadLoaderPackage(loader); loadRealmPackage(loader); loadServletsPackage(loader); loadSessionPackage(loader); loadUtilPackage(loader); loadValvesPackage(loader); loadJavaxPackage(loader); loadConnectorPackage(loader); loadTomcatPackage(loader); } private static final void loadCorePackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.core."; loader.loadClass(basePackage + "AccessLogAdapter"); loader.loadClass(basePackage + "ApplicationContextFacade$PrivilegedExecuteMethod"); loader.loadClass(basePackage + "ApplicationDispatcher$PrivilegedForward"); loader.loadClass(basePackage + "ApplicationDispatcher$PrivilegedInclude"); loader.loadClass(basePackage + "ApplicationPushBuilder"); loader.loadClass(basePackage + "AsyncContextImpl"); loader.loadClass(basePackage + "AsyncContextImpl$AsyncRunnable"); loader.loadClass(basePackage + "AsyncContextImpl$DebugException"); loader.loadClass(basePackage + "AsyncListenerWrapper"); loader.loadClass(basePackage + "ContainerBase$PrivilegedAddChild"); loadAnonymousInnerClasses(loader, basePackage + "DefaultInstanceManager"); loader.loadClass(basePackage + "DefaultInstanceManager$AnnotationCacheEntry"); loader.loadClass(basePackage + "DefaultInstanceManager$AnnotationCacheEntryType"); loader.loadClass(basePackage + "ApplicationHttpRequest$AttributeNamesEnumerator"); } private static final void loadLoaderPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.loader."; loader.loadClass(basePackage + "WebappClassLoaderBase$PrivilegedFindClassByName"); loader.loadClass(basePackage + "WebappClassLoaderBase$PrivilegedHasLoggingConfig"); } private static final void loadRealmPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.realm."; loader.loadClass(basePackage + "LockOutRealm$LockRecord"); } private static final void loadServletsPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.servlets."; // Avoid a possible memory leak in the DefaultServlet when running with // a security manager. The DefaultServlet needs to load an XML parser // when running under a security manager. We want this to be loaded by // the container rather than a web application to prevent a memory leak // via web application class loader. loader.loadClass(basePackage + "DefaultServlet"); } private static final void loadSessionPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.session."; loader.loadClass(basePackage + "StandardSession"); loadAnonymousInnerClasses(loader, basePackage + "StandardSession"); loader.loadClass(basePackage + "StandardManager$PrivilegedDoUnload"); } private static final void loadUtilPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.util."; loader.loadClass(basePackage + "ParameterMap"); loader.loadClass(basePackage + "RequestUtil"); loader.loadClass(basePackage + "TLSUtil"); } private static final void loadValvesPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.valves."; loadAnonymousInnerClasses(loader, basePackage + "AbstractAccessLogValve"); } private static final void loadCoyotePackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.coyote."; loader.loadClass(basePackage + "http11.Constants"); // Make sure system property is read at this point Class<?> clazz = loader.loadClass(basePackage + "Constants"); clazz.getConstructor().newInstance(); loader.loadClass(basePackage + "http2.Stream$PrivilegedPush"); } private static final void loadJavaxPackage(ClassLoader loader) throws Exception { loader.loadClass("javax.servlet.http.Cookie"); } private static final void loadConnectorPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.catalina.connector."; loader.loadClass(basePackage + "RequestFacade$GetAttributePrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetParameterMapPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetRequestDispatcherPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetParameterPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetParameterNamesPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetParameterValuePrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetCharacterEncodingPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetHeadersPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetHeaderNamesPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetCookiesPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetLocalePrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetLocalesPrivilegedAction"); loader.loadClass(basePackage + "ResponseFacade$SetContentTypePrivilegedAction"); loader.loadClass(basePackage + "ResponseFacade$DateHeaderPrivilegedAction"); loader.loadClass(basePackage + "RequestFacade$GetSessionPrivilegedAction"); loadAnonymousInnerClasses(loader, basePackage + "ResponseFacade"); loadAnonymousInnerClasses(loader, basePackage + "OutputBuffer"); loadAnonymousInnerClasses(loader, basePackage + "CoyoteInputStream"); loadAnonymousInnerClasses(loader, basePackage + "InputBuffer"); loadAnonymousInnerClasses(loader, basePackage + "Response"); } private static final void loadTomcatPackage(ClassLoader loader) throws Exception { final String basePackage = "org.apache.tomcat."; // buf loader.loadClass(basePackage + "util.buf.B2CConverter"); loader.loadClass(basePackage + "util.buf.ByteBufferUtils"); loader.loadClass(basePackage + "util.buf.C2BConverter"); loader.loadClass(basePackage + "util.buf.HexUtils"); loader.loadClass(basePackage + "util.buf.StringCache"); loader.loadClass(basePackage + "util.buf.StringCache$ByteEntry"); loader.loadClass(basePackage + "util.buf.StringCache$CharEntry"); loader.loadClass(basePackage + "util.buf.UriUtil"); // collections Class<?> clazz = loader.loadClass(basePackage + "util.collections.CaseInsensitiveKeyMap"); // Ensure StringManager is configured clazz.getConstructor().newInstance(); loader.loadClass(basePackage + "util.collections.CaseInsensitiveKeyMap$EntryImpl"); loader.loadClass(basePackage + "util.collections.CaseInsensitiveKeyMap$EntryIterator"); loader.loadClass(basePackage + "util.collections.CaseInsensitiveKeyMap$EntrySet"); loader.loadClass(basePackage + "util.collections.CaseInsensitiveKeyMap$Key"); // http loader.loadClass(basePackage + "util.http.CookieProcessor"); loader.loadClass(basePackage + "util.http.NamesEnumerator"); // Make sure system property is read at this point clazz = loader.loadClass(basePackage + "util.http.FastHttpDateFormat"); clazz.getConstructor().newInstance(); loader.loadClass(basePackage + "util.http.parser.HttpParser"); loader.loadClass(basePackage + "util.http.parser.MediaType"); loader.loadClass(basePackage + "util.http.parser.MediaTypeCache"); loader.loadClass(basePackage + "util.http.parser.SkipResult"); // net loader.loadClass(basePackage + "util.net.Constants"); loader.loadClass(basePackage + "util.net.DispatchType"); loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableAdd"); loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableCancel"); loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableRemove"); // security loader.loadClass(basePackage + "util.security.PrivilegedGetTccl"); loader.loadClass(basePackage + "util.security.PrivilegedSetTccl"); } private static final void loadAnonymousInnerClasses(ClassLoader loader, String enclosingClass) { try { for (int i = 1;; i++) { loader.loadClass(enclosingClass + '$' + i); } } catch (ClassNotFoundException ignored) { // } } }
// 初始化完成后,准备启动tomcat了
// 启动时序 if (command.equals("start")) { // 设置标志位 daemon.setAwait(true); // 加载参数 daemon.load(args); // 启动监听 daemon.start(); if (null == daemon.getServer()) { System.exit(1); } }
/** * Set flag. * @param await <code>true</code> if the daemon should block * @throws Exception Reflection error */ public void setAwait(boolean await) throws Exception { Class<?> paramTypes[] = new Class[1]; paramTypes[0] = Boolean.TYPE; Object paramValues[] = new Object[1]; paramValues[0] = Boolean.valueOf(await); Method method = catalinaDaemon.getClass().getMethod("setAwait", paramTypes); method.invoke(catalinaDaemon, paramValues); } // 加载参数配置,设置 /** * Load daemon. */ private void load(String[] arguments) throws Exception { // Call the load() method String methodName = "load"; Object param[]; Class<?> paramTypes[]; if (arguments==null || arguments.length==0) { paramTypes = null; param = null; } else { paramTypes = new Class[1]; paramTypes[0] = arguments.getClass(); param = new Object[1]; param[0] = arguments; } Method method = catalinaDaemon.getClass().getMethod(methodName, paramTypes); if (log.isDebugEnabled()) log.debug("Calling startup class " + method); method.invoke(catalinaDaemon, param); }
// 调用 org.apache.catalina.startup.Catalina.load()
/** * Start a new server instance. */ public void load() { // 只加载一次 if (loaded) { return; } loaded = true; long t1 = System.nanoTime(); // 检查临时目录设置 initDirs(); // Before digester - it may be needed initNaming(); // Create and execute our Digester // 初始化 server.xml 的配置项 Digester digester = createStartDigester(); InputSource inputSource = null; InputStream inputStream = null; File file = null; try { try { // 加载配置 server.xml file = configFile(); inputStream = new FileInputStream(file); inputSource = new InputSource(file.toURI().toURL().toString()); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString("catalina.configFail", file), e); } } if (inputStream == null) { try { inputStream = getClass().getClassLoader() .getResourceAsStream(getConfigFile()); inputSource = new InputSource (getClass().getClassLoader() .getResource(getConfigFile()).toString()); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString("catalina.configFail", getConfigFile()), e); } } } // This should be included in catalina.jar // Alternative: don't bother with xml, just create it manually. if (inputStream == null) { try { inputStream = getClass().getClassLoader() .getResourceAsStream("server-embed.xml"); inputSource = new InputSource (getClass().getClassLoader() .getResource("server-embed.xml").toString()); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString("catalina.configFail", "server-embed.xml"), e); } } } if (inputStream == null || inputSource == null) { if (file == null) { log.warn(sm.getString("catalina.configFail", getConfigFile() + "] or [server-embed.xml]")); } else { log.warn(sm.getString("catalina.configFail", file.getAbsolutePath())); if (file.exists() && !file.canRead()) { log.warn("Permissions incorrect, read permission is not allowed on the file."); } } return; } try { inputSource.setByteStream(inputStream); // Catalina 加入栈 digester.push(this); digester.parse(inputSource); } catch (SAXParseException spe) { log.warn("Catalina.start using " + getConfigFile() + ": " + spe.getMessage()); return; } catch (Exception e) { log.warn("Catalina.start using " + getConfigFile() + ": " , e); return; } } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { // Ignore } } } getServer().setCatalina(this); getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile()); getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile()); // Stream redirection initStreams(); // Start the new server try { getServer().init(); } catch (LifecycleException e) { if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) { throw new java.lang.Error(e); } else { log.error("Catalina.start", e); } } long t2 = System.nanoTime(); if(log.isInfoEnabled()) { log.info("Initialization processed in " + ((t2 - t1) / 1000000) + " ms"); } } /** * Create and configure the Digester we will be using for startup. * @return the main digester to parse server.xml */ protected Digester createStartDigester() { long t1=System.currentTimeMillis(); // Initialize the digester Digester digester = new Digester(); digester.setValidating(false); digester.setRulesValidation(true); HashMap<Class<?>, List<String>> fakeAttributes = new HashMap<>(); ArrayList<String> attrs = new ArrayList<>(); attrs.add("className"); fakeAttributes.put(Object.class, attrs); digester.setFakeAttributes(fakeAttributes); digester.setUseContextClassLoader(true); // Configure the actions we will be using digester.addObjectCreate("Server", "org.apache.catalina.core.StandardServer", "className"); digester.addSetProperties("Server"); digester.addSetNext("Server", "setServer", "org.apache.catalina.Server"); digester.addObjectCreate("Server/GlobalNamingResources", "org.apache.catalina.deploy.NamingResourcesImpl"); digester.addSetProperties("Server/GlobalNamingResources"); digester.addSetNext("Server/GlobalNamingResources", "setGlobalNamingResources", "org.apache.catalina.deploy.NamingResourcesImpl"); digester.addObjectCreate("Server/Listener", null, // MUST be specified in the element "className"); digester.addSetProperties("Server/Listener"); digester.addSetNext("Server/Listener", "addLifecycleListener", "org.apache.catalina.LifecycleListener"); digester.addObjectCreate("Server/Service", "org.apache.catalina.core.StandardService", "className"); digester.addSetProperties("Server/Service"); digester.addSetNext("Server/Service", "addService", "org.apache.catalina.Service"); digester.addObjectCreate("Server/Service/Listener", null, // MUST be specified in the element "className"); digester.addSetProperties("Server/Service/Listener"); digester.addSetNext("Server/Service/Listener", "addLifecycleListener", "org.apache.catalina.LifecycleListener"); //Executor digester.addObjectCreate("Server/Service/Executor", "org.apache.catalina.core.StandardThreadExecutor", "className"); digester.addSetProperties("Server/Service/Executor"); digester.addSetNext("Server/Service/Executor", "addExecutor", "org.apache.catalina.Executor"); digester.addRule("Server/Service/Connector", new ConnectorCreateRule()); digester.addRule("Server/Service/Connector", new SetAllPropertiesRule(new String[]{"executor", "sslImplementationName"})); digester.addSetNext("Server/Service/Connector", "addConnector", "org.apache.catalina.connector.Connector"); digester.addObjectCreate("Server/Service/Connector/SSLHostConfig", "org.apache.tomcat.util.net.SSLHostConfig"); digester.addSetProperties("Server/Service/Connector/SSLHostConfig"); digester.addSetNext("Server/Service/Connector/SSLHostConfig", "addSslHostConfig", "org.apache.tomcat.util.net.SSLHostConfig"); digester.addRule("Server/Service/Connector/SSLHostConfig/Certificate", new CertificateCreateRule()); digester.addRule("Server/Service/Connector/SSLHostConfig/Certificate", new SetAllPropertiesRule(new String[]{"type"})); digester.addSetNext("Server/Service/Connector/SSLHostConfig/Certificate", "addCertificate", "org.apache.tomcat.util.net.SSLHostConfigCertificate"); digester.addObjectCreate("Server/Service/Connector/SSLHostConfig/OpenSSLConf", "org.apache.tomcat.util.net.openssl.OpenSSLConf"); digester.addSetProperties("Server/Service/Connector/SSLHostConfig/OpenSSLConf"); digester.addSetNext("Server/Service/Connector/SSLHostConfig/OpenSSLConf", "setOpenSslConf", "org.apache.tomcat.util.net.openssl.OpenSSLConf"); digester.addObjectCreate("Server/Service/Connector/SSLHostConfig/OpenSSLConf/OpenSSLConfCmd", "org.apache.tomcat.util.net.openssl.OpenSSLConfCmd"); digester.addSetProperties("Server/Service/Connector/SSLHostConfig/OpenSSLConf/OpenSSLConfCmd"); digester.addSetNext("Server/Service/Connector/SSLHostConfig/OpenSSLConf/OpenSSLConfCmd", "addCmd", "org.apache.tomcat.util.net.openssl.OpenSSLConfCmd"); digester.addObjectCreate("Server/Service/Connector/Listener", null, // MUST be specified in the element "className"); digester.addSetProperties("Server/Service/Connector/Listener"); digester.addSetNext("Server/Service/Connector/Listener", "addLifecycleListener", "org.apache.catalina.LifecycleListener"); digester.addObjectCreate("Server/Service/Connector/UpgradeProtocol", null, // MUST be specified in the element "className"); digester.addSetProperties("Server/Service/Connector/UpgradeProtocol"); digester.addSetNext("Server/Service/Connector/UpgradeProtocol", "addUpgradeProtocol", "org.apache.coyote.UpgradeProtocol"); // Add RuleSets for nested elements digester.addRuleSet(new NamingRuleSet("Server/GlobalNamingResources/")); digester.addRuleSet(new EngineRuleSet("Server/Service/")); digester.addRuleSet(new HostRuleSet("Server/Service/Engine/")); digester.addRuleSet(new ContextRuleSet("Server/Service/Engine/Host/")); addClusterRuleSet(digester, "Server/Service/Engine/Host/Cluster/"); digester.addRuleSet(new NamingRuleSet("Server/Service/Engine/Host/Context/")); // When the 'engine' is found, set the parentClassLoader. digester.addRule("Server/Service/Engine", new SetParentClassLoaderRule(parentClassLoader)); addClusterRuleSet(digester, "Server/Service/Engine/Cluster/"); long t2=System.currentTimeMillis(); if (log.isDebugEnabled()) { log.debug("Digester for server.xml created " + ( t2-t1 )); } return (digester); } protected File configFile() { File file = new File(configFile); if (!file.isAbsolute()) { file = new File(Bootstrap.getCatalinaBase(), configFile); } return (file); }
// org.apache.tomcat.util.digester.Digester.push() 入栈, parse() 解析配置文件
/** * Push a new object onto the top of the object stack. * * @param object The new object */ public void push(Object object) { if (stack.size() == 0) { root = object; } stack.push(object); } /** * Parse the content of the specified input source using this Digester. * Returns the root element from the object stack (if any). * * @param input Input source containing the XML data to be parsed * @return the root object * @exception IOException if an input/output error occurs * @exception SAXException if a parsing exception occurs */ public Object parse(InputSource input) throws IOException, SAXException { configure(); getXMLReader().parse(input); return (root); } View Code
// org.apache.catalina.util.LifeCycleBase.init()
@Override public final synchronized void init() throws LifecycleException { if (!state.equals(LifecycleState.NEW)) { invalidTransition(Lifecycle.BEFORE_INIT_EVENT); } try { setStateInternal(LifecycleState.INITIALIZING, null, false); initInternal(); setStateInternal(LifecycleState.INITIALIZED, null, false); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); setStateInternal(LifecycleState.FAILED, null, false); throw new LifecycleException( sm.getString("lifecycleBase.initFail",toString()), t); } } // setStateInternal, 设置当前运行状态 private synchronized void setStateInternal(LifecycleState state, Object data, boolean check) throws LifecycleException { if (log.isDebugEnabled()) { log.debug(sm.getString("lifecycleBase.setState", this, state)); } if (check) { // Must have been triggered by one of the abstract methods (assume // code in this class is correct) // null is never a valid state if (state == null) { invalidTransition("null"); // Unreachable code - here to stop eclipse complaining about // a possible NPE further down the method return; } // Any method can transition to failed // startInternal() permits STARTING_PREP to STARTING // stopInternal() permits STOPPING_PREP to STOPPING and FAILED to // STOPPING if (!(state == LifecycleState.FAILED || (this.state == LifecycleState.STARTING_PREP && state == LifecycleState.STARTING) || (this.state == LifecycleState.STOPPING_PREP && state == LifecycleState.STOPPING) || (this.state == LifecycleState.FAILED && state == LifecycleState.STOPPING))) { // No other transition permitted invalidTransition(state.name()); } } this.state = state; String lifecycleEvent = state.getLifecycleEvent(); if (lifecycleEvent != null) { fireLifecycleEvent(lifecycleEvent, data); } }
// 进行事件监听通知 /** * Allow sub classes to fire {@link Lifecycle} events. * * @param type Event type * @param data Data associated with event. */ protected void fireLifecycleEvent(String type, Object data) { LifecycleEvent event = new LifecycleEvent(this, type, data); // NamingContextListener // VersionLoggerListener // AprLifecycleListener // JreMemoryLeakPreventionListener // GlobalResourcesLifecycleListener // ThreadLocalLeakPreventionListener for (LifecycleListener listener : lifecycleListeners) { listener.lifecycleEvent(event); } } // org.apache.catalina.core.StandardServer.initInternal() /** * Invoke a pre-startup initialization. This is used to allow connectors * to bind to restricted ports under Unix operating environments. */ @Override protected void initInternal() throws LifecycleException { super.initInternal(); // Register global String cache // Note although the cache is global, if there are multiple Servers // present in the JVM (may happen when embedding) then the same cache // will be registered under multiple names onameStringCache = register(new StringCache(), "type=StringCache"); // Register the MBeanFactory MBeanFactory factory = new MBeanFactory(); factory.setContainer(this); onameMBeanFactory = register(factory, "type=MBeanFactory"); // Register the naming resources globalNamingResources.init(); // Populate the extension validator with JARs from common and shared // class loaders if (getCatalina() != null) { ClassLoader cl = getCatalina().getParentClassLoader(); // Walk the class loader hierarchy. Stop at the system class loader. // This will add the shared (if present) and common class loaders while (cl != null && cl != ClassLoader.getSystemClassLoader()) { if (cl instanceof URLClassLoader) { URL[] urls = ((URLClassLoader) cl).getURLs(); for (URL url : urls) { if (url.getProtocol().equals("file")) { try { File f = new File (url.toURI()); if (f.isFile() && f.getName().endsWith(".jar")) { ExtensionValidator.addSystemResource(f); } } catch (URISyntaxException e) { // Ignore } catch (IOException e) { // Ignore } } } } cl = cl.getParent(); } } // Initialize our defined Services for (int i = 0; i < services.length; i++) { services[i].init(); } } // org.apache.catalina.util.LifecycleMBeanBase.initInternal() /** * Sub-classes wishing to perform additional initialization should override * this method, ensuring that super.initInternal() is the first call in the * overriding method. */ @Override protected void initInternal() throws LifecycleException { // If oname is not null then registration has already happened via // preRegister(). if (oname == null) { mserver = Registry.getRegistry(null, null).getMBeanServer(); oname = register(this, getObjectNameKeyProperties()); } } /** * Utility method to enable sub-classes to easily register additional * components that don't implement {@link JmxEnabled} with an MBean server. * <br> * Note: This method should only be used once {@link #initInternal()} has * been called and before {@link #destroyInternal()} has been called. * * @param obj The object the register * @param objectNameKeyProperties The key properties component of the * object name to use to register the * object * * @return The name used to register the object */ protected final ObjectName register(Object obj, String objectNameKeyProperties) { // Construct an object name with the right domain StringBuilder name = new StringBuilder(getDomain()); name.append(':'); name.append(objectNameKeyProperties); ObjectName on = null; try { on = new ObjectName(name.toString()); Registry.getRegistry(null, null).registerComponent(obj, on, null); } catch (MalformedObjectNameException e) { log.warn(sm.getString("lifecycleMBeanBase.registerFail", obj, name), e); } catch (Exception e) { log.warn(sm.getString("lifecycleMBeanBase.registerFail", obj, name), e); } return on; } View Code
// org.apache.catalina.util.LifecycleBase.init()
@Override public final synchronized void init() throws LifecycleException { if (!state.equals(LifecycleState.NEW)) { invalidTransition(Lifecycle.BEFORE_INIT_EVENT); } try { setStateInternal(LifecycleState.INITIALIZING, null, false); initInternal(); setStateInternal(LifecycleState.INITIALIZED, null, false); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); setStateInternal(LifecycleState.FAILED, null, false); throw new LifecycleException( sm.getString("lifecycleBase.initFail",toString()), t); } }
// StandardService.initInternal() 初始化服务,线程池。。。
/** * Invoke a pre-startup initialization. This is used to allow connectors * to bind to restricted ports under Unix operating environments. */ @Override protected void initInternal() throws LifecycleException { super.initInternal(); // engin init if (engine != null) { engine.init(); } // Initialize any Executors for (Executor executor : findExecutors()) { if (executor instanceof JmxEnabled) { ((JmxEnabled) executor).setDomain(getDomain()); } executor.init(); } // Initialize mapper listener mapperListener.init(); // Initialize our defined Connectors // 初始化连接器,如: Connector[HTTP/1.1-8080], Connector[AJP/1.3-8011] synchronized (connectorsLock) { for (Connector connector : connectors) { try { connector.init(); } catch (Exception e) { String message = sm.getString( "standardService.connector.initFailed", connector); log.error(message, e); if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) throw new LifecycleException(message); } } } }
// org.apache.catalina.core.StandardEngine.initInternal()
@Override protected void initInternal() throws LifecycleException { // Ensure that a Realm is present before any attempt is made to start // one. This will create the default NullRealm if necessary. getRealm(); super.initInternal(); } // org.apache.catalina.core.ContainerBase.initInternal() // Catalina-startStop-xx 线程池创建 @Override protected void initInternal() throws LifecycleException { BlockingQueue<Runnable> startStopQueue = new LinkedBlockingQueue<>(); startStopExecutor = new ThreadPoolExecutor( getStartStopThreadsInternal(), getStartStopThreadsInternal(), 10, TimeUnit.SECONDS, startStopQueue, new StartStopThreadFactory(getName() + "-startStop-")); startStopExecutor.allowCoreThreadTimeOut(true); super.initInternal(); } // org.apache.catalina.connector.Connector @Override protected void initInternal() throws LifecycleException { super.initInternal(); // Initialize adapter adapter = new CoyoteAdapter(this); protocolHandler.setAdapter(adapter); // Make sure parseBodyMethodsSet has a default if (null == parseBodyMethodsSet) { setParseBodyMethods(getParseBodyMethods()); } if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) { throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr", getProtocolHandlerClassName())); } if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() && protocolHandler instanceof AbstractHttp11JsseProtocol) { AbstractHttp11JsseProtocol<?> jsseProtocolHandler = (AbstractHttp11JsseProtocol<?>) protocolHandler; if (jsseProtocolHandler.isSSLEnabled() && jsseProtocolHandler.getSslImplementationName() == null) { // OpenSSL is compatible with the JSSE configuration, so use it if APR is available jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName()); } } try { protocolHandler.init(); } catch (Exception e) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e); } } // org.apache.coyote.AbstractProtocol.init() @Override public void init() throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.init", getName())); } if (oname == null) { // Component not pre-registered so register it oname = createObjectName(); if (oname != null) { Registry.getRegistry(null, null).registerComponent(this, oname, null); } } if (this.domain != null) { rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName()); Registry.getRegistry(null, null).registerComponent( getHandler().getGlobal(), rgOname, null); } String endpointName = getName(); endpoint.setName(endpointName.substring(1, endpointName.length()-1)); endpoint.setDomain(domain); endpoint.init(); } // org.apache.tomcat.util.net.AbstractEndpoint.init() public void init() throws Exception { if (bindOnInit) { bind(); bindState = BindState.BOUND_ON_INIT; } if (this.domain != null) { // Register endpoint (as ThreadPool - historical name) oname = new ObjectName(domain + ":type=ThreadPool,name=/"" + getName() + "/""); Registry.getRegistry(null, null).registerComponent(this, oname, null); for (SSLHostConfig sslHostConfig : findSslHostConfigs()) { registerJmx(sslHostConfig); } } } View Code
// NioEndpoint.bind(), 打开socket连接
/** * Initialize the endpoint. */ @Override public void bind() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } serverSock.configureBlocking(true); //mimic APR behavior // Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; } // 闭锁控制 setStopLatch(new CountDownLatch(pollerThreadCount)); // Initialize SSL if needed initialiseSsl(); // 打开 select 连接 selectorPool.open(); } // org.apache.tomcat.util.net.NioSelectorPool.open() public void open() throws IOException { enabled = true; getSharedSelector(); if (SHARED) { blockingSelector = new NioBlockingSelector(); blockingSelector.open(getSharedSelector()); } }
// load() 完成后,回到 Bootstrap
// daemon.start();
/** * Start the Catalina daemon. * @throws Exception Fatal start error */ public void start() throws Exception { if( catalinaDaemon==null ) init(); Method method = catalinaDaemon.getClass().getMethod("start", (Class [] )null); method.invoke(catalinaDaemon, (Object [])null); } // org.apache.catalina.startup.Catalina.start() /** * Start a new server instance. */ public void start() { if (getServer() == null) { load(); } // 如果经过前面的server初始化,还是获取不到 StandardServer if (getServer() == null) { log.fatal("Cannot start server. Server instance is not configured."); return; } long t1 = System.nanoTime(); // Start the new server try { // 调用 server.start() getServer().start(); } catch (LifecycleException e) { log.fatal(sm.getString("catalina.serverStartFail"), e); try { getServer().destroy(); } catch (LifecycleException e1) { log.debug("destroy() failed for failed Server ", e1); } return; } long t2 = System.nanoTime(); if(log.isInfoEnabled()) { log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms"); } // Register shutdown hook 注册关闭钩子,进行资源清理,server stop if (useShutdownHook) { if (shutdownHook == null) { shutdownHook = new CatalinaShutdownHook(); } Runtime.getRuntime().addShutdownHook(shutdownHook); // If JULI is being used, disable JULI's shutdown hook since // shutdown hooks run in parallel and log messages may be lost // if JULI's hook completes before the CatalinaShutdownHook() LogManager logManager = LogManager.getLogManager(); if (logManager instanceof ClassLoaderLogManager) { ((ClassLoaderLogManager) logManager).setUseShutdownHook( false); } } if (await) { await(); stop(); } }
// LifeCycleBase.start() /** * {@inheritDoc} */ @Override public final synchronized void start() throws LifecycleException { if (LifecycleState.STARTING_PREP.equals(state) || LifecycleState.STARTING.equals(state) || LifecycleState.STARTED.equals(state)) { if (log.isDebugEnabled()) { Exception e = new LifecycleException(); log.debug(sm.getString("lifecycleBase.alreadyStarted", toString()), e); } else if (log.isInfoEnabled()) { log.info(sm.getString("lifecycleBase.alreadyStarted", toString())); } return; } // 再次检测状态,确保初始化ok if (state.equals(LifecycleState.NEW)) { init(); } else if (state.equals(LifecycleState.FAILED)) { stop(); } else if (!state.equals(LifecycleState.INITIALIZED) && !state.equals(LifecycleState.STOPPED)) { invalidTransition(Lifecycle.BEFORE_START_EVENT); } try { setStateInternal(LifecycleState.STARTING_PREP, null, false); startInternal(); if (state.equals(LifecycleState.FAILED)) { // This is a 'controlled' failure. The component put itself into the // FAILED state so call stop() to complete the clean-up. stop(); } else if (!state.equals(LifecycleState.STARTING)) { // Shouldn't be necessary but acts as a check that sub-classes are // doing what they are supposed to. invalidTransition(Lifecycle.AFTER_START_EVENT); } else { setStateInternal(LifecycleState.STARTED, null, false); } } catch (Throwable t) { // This is an 'uncontrolled' failure so put the component into the // FAILED state and throw an exception. ExceptionUtils.handleThrowable(t); setStateInternal(LifecycleState.FAILED, null, false); throw new LifecycleException(sm.getString("lifecycleBase.startFail", toString()), t); } } // /** * Start nested components ({@link Service}s) and implement the requirements * of {@link org.apache.catalina.util.LifecycleBase#startInternal()}. * * @exception LifecycleException if this component detects a fatal error * that prevents this component from being used */ @Override protected void startInternal() throws LifecycleException { fireLifecycleEvent(CONFIGURE_START_EVENT, null); setState(LifecycleState.STARTING); globalNamingResources.start(); // Start our defined Services synchronized (servicesLock) { for (int i = 0; i < services.length; i++) { services[i].start(); } } } /** * Shutdown hook which will perform a clean shutdown of Catalina if needed. */ protected class CatalinaShutdownHook extends Thread { @Override public void run() { try { if (getServer() != null) { Catalina.this.stop(); } } catch (Throwable ex) { ExceptionUtils.handleThrowable(ex); log.error(sm.getString("catalina.shutdownHookFail"), ex); } finally { // If JULI is used, shut JULI down *after* the server shuts down // so log messages aren't lost LogManager logManager = LogManager.getLogManager(); if (logManager instanceof ClassLoaderLogManager) { ((ClassLoaderLogManager) logManager).shutdown(); } } } } View Code
// 阻塞等待 if (await) { await(); stop(); }
// org.apache.catalina.core.StandardServer /** * Wait until a proper shutdown command is received, then return. * This keeps the main thread alive - the thread pool listening for http * connections is daemon threads. */ @Override public void await() { // Negative values - don't wait on port - tomcat is embedded or we just don't like ports if( port == -2 ) { // undocumented yet - for embedding apps that are around, alive. return; } if( port==-1 ) { try { awaitThread = Thread.currentThread(); while(!stopAwait) { try { Thread.sleep( 10000 ); } catch( InterruptedException ex ) { // continue and check the flag } } } finally { awaitThread = null; } return; } // Set up a server socket to wait on try { awaitSocket = new ServerSocket(port, 1, InetAddress.getByName(address)); } catch (IOException e) { log.error("StandardServer.await: create[" + address + ":" + port + "]: ", e); return; } try { awaitThread = Thread.currentThread(); // Loop waiting for a connection and a valid command // 进入正题,等待连接 while (!stopAwait) { ServerSocket serverSocket = awaitSocket; if (serverSocket == null) { break; } // Wait for the next connection Socket socket = null; StringBuilder command = new StringBuilder(); try { InputStream stream; long acceptStartTime = System.currentTimeMillis(); try { // 此处阻塞等待连接控制,一般端口为 8005 socket = serverSocket.accept(); socket.setSoTimeout(10 * 1000); // Ten seconds stream = socket.getInputStream(); } catch (SocketTimeoutException ste) { // This should never happen but bug 56684 suggests that // it does. log.warn(sm.getString("standardServer.accept.timeout", Long.valueOf(System.currentTimeMillis() - acceptStartTime)), ste); continue; } catch (AccessControlException ace) { log.warn("StandardServer.accept security exception: " + ace.getMessage(), ace); continue; } catch (IOException e) { if (stopAwait) { // Wait was aborted with socket.close() break; } log.error("StandardServer.await: accept: ", e); break; } // Read a set of characters from the socket int expected = 1024; // Cut off to avoid DoS attack while (expected < shutdown.length()) { if (random == null) random = new Random(); expected += (random.nextInt() % 1024); } while (expected > 0) { int ch = -1; try { ch = stream.read(); } catch (IOException e) { log.warn("StandardServer.await: read: ", e); ch = -1; } // Control character or EOF (-1) terminates loop if (ch < 32 || ch == 127) { break; } command.append((char) ch); expected--; } } finally { // Close the socket now that we are done with it try { if (socket != null) { socket.close(); } } catch (IOException e) { // Ignore } } // Match against our command string boolean match = command.toString().equals(shutdown); if (match) { log.info(sm.getString("standardServer.shutdownViaPort")); break; } else log.warn("StandardServer.await: Invalid command '" + command.toString() + "' received"); } } finally { ServerSocket serverSocket = awaitSocket; awaitThread = null; awaitSocket = null; // Close the server socket and return if (serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { // Ignore } } } } // AbstractEndpoint.start() public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } startInternal(); } // NioEndpoint.startInternal(); /** * Start the NIO endpoint, creating acceptor, poller threads. */ @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if ( getExecutor() == null ) { createExecutor(); } initializeConnectionLatch(); // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); } } // AbstractEndpoint.createExecutor(), 创建excutor, 备用 public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); } View Code
// NioEndpoint$Acceptor 进行请求监听
/** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // 真正去处理业务逻辑去了 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } private void closeSocket(SocketChannel socket) { countDownConnection(); try { socket.socket().close(); } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } try { socket.close(); } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } } } @Override protected SocketProcessorBase<NioChannel> createSocketProcessor( SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { return new SocketProcessor(socketWrapper, event); } private void close(NioChannel socket, SelectionKey key) { try { if (socket.getPoller().cancelledKey(key) != null) { // SocketWrapper (attachment) was removed from the // key - recycle the key. This can only happen once // per attempted closure so it is used to determine // whether or not to return the key to the cache. // We do NOT want to do this more than once - see BZ // 57340 / 57943. if (log.isDebugEnabled()) { log.debug("Socket: [" + socket + "] closed"); } if (running && !paused) { if (!nioChannels.push(socket)) { socket.free(); } } } } catch (Exception x) { log.error("",x); } }
// 接收请求是: org.apache.tomcat.util.net.NioEndpoint$Acceptor
// 接收到后, NioEndpoint.setSocketOptions(SocketChannel socket)
// 注册处理事件 NioEndpoint$Poller.register(final NioChannel socket)
// NioEndpoint$Poller.doRun 进行事件处理
/** * Registers a newly created socket with the poller. * * @param socket The newly created socket */ public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); } // 添加到事件Poller 队列, 并唤醒处理 selector private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); } // 后台消费队列, org.apache.tomcat.util.net.NioEndpoint$Poller /** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */ @Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); } // NioEndpoint$Poller.timeout() protected void timeout(int keyCount, boolean hasEvents) { long now = System.currentTimeMillis(); // This method is called on every loop of the Poller. Don't process // timeouts on every loop of the Poller since that would create too // much load and timeouts can afford to wait a few seconds. // However, do process timeouts if any of the following are true: // - the selector simply timed out (suggests there isn't much load) // - the nextExpiration time has passed // - the server socket is being closed if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) { return; } //timeout int keycount = 0; try { for (SelectionKey key : selector.keys()) { keycount++; try { NioSocketWrapper ka = (NioSocketWrapper) key.attachment(); if ( ka == null ) { cancelledKey(key); //we don't support any keys without attachments } else if (close) { key.interestOps(0); ka.interestOps(0); //avoid duplicate stop calls processKey(key,ka); } else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ || (ka.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { boolean isTimedOut = false; // Check for read timeout if ((ka.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { long delta = now - ka.getLastRead(); long timeout = ka.getReadTimeout(); isTimedOut = timeout > 0 && delta > timeout; } // Check for write timeout if (!isTimedOut && (ka.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { long delta = now - ka.getLastWrite(); long timeout = ka.getWriteTimeout(); isTimedOut = timeout > 0 && delta > timeout; } if (isTimedOut) { key.interestOps(0); ka.interestOps(0); //avoid duplicate timeout calls ka.setError(new SocketTimeoutException()); // 提交 excutor 执行逻辑,如交由框架处理 if (!processSocket(ka, SocketEvent.ERROR, true)) { cancelledKey(key); } } } }catch ( CancelledKeyException ckx ) { cancelledKey(key); } }//for } catch (ConcurrentModificationException cme) { // See https://bz.apache.org/bugzilla/show_bug.cgi?id=57943 log.warn(sm.getString("endpoint.nio.timeoutCme"), cme); } long prevExp = nextExpiration; //for logging purposes only nextExpiration = System.currentTimeMillis() + socketProperties.getTimeoutInterval(); if (log.isTraceEnabled()) { log.trace("timeout completed: keys processed=" + keycount + "; now=" + now + "; nextExpiration=" + prevExp + "; keyCount=" + keyCount + "; hasEvents=" + hasEvents + "; eval=" + ((now < prevExp) && (keyCount>0 || hasEvents) && (!close) )); } } }
// 处理业务逻辑 org.apache.tomcat.util.net.AbstractEndpoint.processSocket()
/** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } // 此处获取处理线程,即: org.apache.tomcat.util.net.NioEndpoint$SocketProcessor SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); // 如果有线程池,就异步处理,否则同步调用 if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
// org.apache.tomcat.util.net.SocketProcessorBase 处理 run(), 子类实现 doRun() 方法。
@Override public final void run() { synchronized (socketWrapper) { // It is possible that processing may be triggered for read and // write at the same time. The sync above makes sure that processing // does not occur in parallel. The test below ensures that if the // first event to be processed results in the socket being closed, // the subsequent events are not processed. if (socketWrapper.isClosed()) { return; } doRun(); } } // NioEndpoint$SocketProcessor 处理逻辑 /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor extends SocketProcessorBase<NioChannel> { public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { super(socketWrapper, event); } @Override protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { int handshake = -1; try { if (key != null) { if (socket.isHandshakeComplete()) { // No TLS handshaking required. Let the handler // process this socket / event combination. handshake = 0; } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT || event == SocketEvent.ERROR) { // Unable to complete the TLS handshake. Treat it as // if the handshake failed. handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket // 转交控控权给业务系统 if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { close(socket, key); } } else if (handshake == -1 ) { close(socket, key); } else if (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("", t); socket.getPoller().cancelledKey(key); } finally { socketWrapper = null; event = null; //return to cache if (running && !paused) { processorCache.push(this); } } } }
// org.apache.coyote.AbstractProtocol$ConnectionHandler -> 获取到 @Override public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); } // Async timeouts are calculated on a dedicated thread and then // dispatched. Because of delays in the dispatch process, the // timeout may no longer be required. Check here and avoid // unnecessary processing. if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) { // This is effectively a NO-OP return SocketState.OPEN; } if (processor != null) { // Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } ContainerThreadMarker.set(); try { if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); if (negotiatedProtocol != null) { UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { // TODO: // OpenSSL 1.0.2's ALPN callback doesn't support // failing the handshake with an error if no // protocol can be negotiated. Therefore, we need to // fail the connection here. Once this is fixed, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); } return SocketState.CLOSED; /* * To replace the code above once OpenSSL 1.1.0 is * used. // Failed to create processor. This is a bug. throw new IllegalStateException(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); */ } } } if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); // Associate the processor with the connection connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler UpgradeToken upgradeToken = processor.getUpgradeToken(); // Retrieve leftover input ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { // Assume direct HTTP/2 connection UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", "h2c")); } return SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate", processor, wrapper)); } wrapper.unRead(leftOverInput); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } } } } while ( state == SocketState.UPGRADING); if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); } } else if (state == SocketState.SUSPENDED) { // Don't add sockets back to the poller. // The resumeProcessing() method will add this socket // to the poller. } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(processor); } } return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } catch (ProtocolException e) { // Protocol exceptions normally mean the client sent invalid or // incomplete data. getLog().debug(sm.getString( "abstractConnectionHandler.protocolexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); } // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return SocketState.CLOSED; } // org.apache.coyote.http11.Http11Processor -> org.apache.coyote.AbstractProcessorLight @Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } } if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED); return state; } View Code
// ... 已经很接近了
未完,待续...