根据之前的文章 《Spring Boot启动源码分析》可知,当执行 AbstractApplicationContext -> onRefresh()
方法时,如果引入了 webflux 相关依赖,会创建 WebServer。
/** ReactiveWebServerApplicationContext **/ private void createWebServer() { WebServerManager serverManager = this.serverManager; if (serverManager == null) { String webServerFactoryBeanName = getWebServerFactoryBeanName(); // 1.1 ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName); // 1.1 boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit(); this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit); // 1.2 getBeanFactory().registerSingleton("webServerGracefulShutdown", new WebServerGracefulShutdownLifecycle(this.serverManager)); getBeanFactory().registerSingleton("webServerStartStop", new WebServerStartStopLifecycle(this.serverManager)); // 1.3 } initPropertySources(); } 复制代码
1.1、根据 web 容器类型,选择对应的 ReactiveWebServerFactory
。比如我这里是默认的 Netty,那么 webServerFactory
就是 NettyReactiveWebServerFactory
类型。
1.2、创建 WebServerManager
对象,用来管理 Server 以及 HttpHandler,入参 ReactiveWebServerFactory
和 HttpHandler
都从 bean 容器中获取。
1.3、注册 WebServerStartStopLifecycle
bean,负责 Server 的启动和停止。
Q:Server 何时启动?
A: SpringApplication
-> refresh(ConfigurableApplicationContext applicationContext)
方法,调用 AbstractApplicationContext
-> finishRefresh()
,之后调用 DefaultLifecycleProcessor
-> onRefresh()
,之后调用 DefaultLifecycleProcessor
-> doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly)
,最终调用 1.3 中的 WebServerStartStopLifecycle
-> start()
方法,启动服务。
接下来看下 WebServerManager
类,
class WebServerManager { private final ReactiveWebServerApplicationContext applicationContext; private final DelayedInitializationHttpHandler handler; private final WebServer webServer; WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory, Supplier<HttpHandler> handlerSupplier, boolean lazyInit) { this.applicationContext = applicationContext; Assert.notNull(factory, "Factory must not be null"); this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit); this.webServer = factory.getWebServer(this.handler); // 1.4 } void start() { this.handler.initializeHandler(); this.webServer.start(); // 启动 webServer this.applicationContext .publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext)); } ... } 复制代码
1.4、创建 webServer,这里会返回 NettyWebServer
,方法如下:
/** NettyReactiveWebServerFactory **/ public WebServer getWebServer(HttpHandler httpHandler) { HttpServer httpServer = createHttpServer(); // 1.5 ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler); NettyWebServer webServer = new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout, getShutdown()); webServer.setRouteProviders(this.routeProviders); return webServer; } private HttpServer createHttpServer() { HttpServer server = HttpServer.create(); // 1.6 返回HttpServerBind if (this.resourceFactory != null) { // 1.7 ReactorResourceFactory是bean LoopResources resources = this.resourceFactory.getLoopResources(); Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); server = server .tcpConfiguration((tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress)); // 1.8 这里返回HttpServerTcpConfig对象 } else { server = server.tcpConfiguration((tcpServer) -> tcpServer.bindAddress(this::getListenAddress)); } if (getSsl() != null && getSsl().isEnabled()) { SslServerCustomizer sslServerCustomizer = new SslServerCustomizer(getSsl(), getHttp2(), getSslStoreProvider()); server = sslServerCustomizer.apply(server); } if (getCompression() != null && getCompression().getEnabled()) { CompressionCustomizer compressionCustomizer = new CompressionCustomizer(getCompression()); server = compressionCustomizer.apply(server); } server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders); //1.9 这里返回新的HttpServerTcpConfig对象 return applyCustomizers(server); //1.10 这里返回新的HttpServerTcpConfig对象 } 复制代码
1.5、创建 HttpServer
,底层是 TcpServer
,后面分析。
1.7、这里提前创建了 NettyReactiveWebServerFactory
bean。其中 LoopResources
负责管理线程, ConnectionProvider
负责管理连接。
1.8、 绑定端口;1.9、设置协议类型,比如 Http1.1 还是 Http2; 注意这里的入参都是函数,需要等到调用的时候才会执行。
1.10、设置 http 请求头的长度等。
final class HttpServerTcpConfig extends HttpServerOperator { final Function<? super TcpServer, ? extends TcpServer> tcpServerMapper; HttpServerTcpConfig(HttpServer server, Function<? super TcpServer, ? extends TcpServer> tcpServerMapper) { super(server); this.tcpServerMapper = Objects.requireNonNull(tcpServerMapper, "tcpServerMapper"); } @Override protected TcpServer tcpConfiguration() { return Objects.requireNonNull(tcpServerMapper.apply(source.tcpConfiguration()), "tcpServerMapper"); } } abstract class HttpServerOperator extends HttpServer { final HttpServer source; HttpServerOperator(HttpServer source) { this.source = Objects.requireNonNull(source, "source"); } } 复制代码
1.8、1.9、1.10、返回了大量的新的 HttpServerTcpConfig
对象, 这里 1.5 返回的 HttpServer
类型应该是 HttpServerTcpConfig
,并且其 source 也是 HttpServerTcpConfig
类型,多层嵌套后,最底层的 source 为 HttpServerBind
类型 。
继续看 NettyWebServer
:
/** NettyWebServer **/ public NettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter, Duration lifecycleTimeout, Shutdown shutdown) { Assert.notNull(httpServer, "HttpServer must not be null"); Assert.notNull(handlerAdapter, "HandlerAdapter must not be null"); this.lifecycleTimeout = lifecycleTimeout; this.handler = handlerAdapter; this.httpServer = httpServer.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor())); // 1.11 this.gracefulShutdown = (shutdown == Shutdown.GRACEFUL) ? new GracefulShutdown(() -> this.disposableServer) : null; } 复制代码
1.11、再次返回新的 HttpServerTcpConfig
,这里创建了线程池 EventExecutor
。
/** NettyWebServer **/ private DisposableServer startHttpServer() { HttpServer server = this.httpServer; // 1.11中的HttpServerTcpConfig对象 if (this.routeProviders.isEmpty()) { server = server.handle(this.handler); // 2.1 返回HttpServerHandler extends HttpServerOperator } else { server = server.route(this::applyRouteProviders); } if (this.lifecycleTimeout != null) { return server.bindNow(this.lifecycleTimeout); } return server.bindNow(); // 2.2 } /** HttpServer **/ public final HttpServer handle(BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { return new HttpServerHandle(this, handler); } /** HttpServerHandler **/ HttpServerHandle(HttpServer server, BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) { super(server); this.handler = Objects.requireNonNull(handler, "handler"); } /** HttpServerOperator **/ HttpServerOperator(HttpServer source) { this.source = Objects.requireNonNull(source, "source"); // 1.11中的HttpServerTcpConfig对象 } 复制代码
2.1、绑定 handler,此时返回的是 HttpServerHandle
对象;
2.2、绑定端口并监听。查看源码,发现主要调用了 2 个方法, bind()
以及 block(timeout)
。
/** HttpServer **/ public final DisposableServer bindNow(Duration timeout) { Objects.requireNonNull(timeout, "timeout"); try { return Objects.requireNonNull(bind().block(timeout), "aborted"); } catch (IllegalStateException e) { if (e.getMessage().contains("blocking read")) { throw new IllegalStateException("HttpServer couldn't be started within " + timeout.toMillis() + "ms"); } throw e; } } 复制代码
先看 bind()
方法:
/** HttpServer **/ public final Mono<? extends DisposableServer> bind() { return bind(tcpConfiguration()); } protected TcpServer tcpConfiguration() { return DEFAULT_TCP_SERVER; // 步骤1.6时已经创建 } static final TcpServer DEFAULT_TCP_SERVER = TcpServer.create(); /** TcpServer **/ public static TcpServer create() { return TcpServerBind.INSTANCE; // 2.3 } static final TcpServerBind INSTANCE = new TcpServerBind(); TcpServerBind() { this.serverBootstrap = createServerBootstrap(); BootstrapHandlers.channelOperationFactory(this.serverBootstrap, TcpUtils.TCP_OPS); } ServerBootstrap createServerBootstrap() { // 2.4 残废的ServerBootstrap return new ServerBootstrap() .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.AUTO_READ, false) .childOption(ChannelOption.TCP_NODELAY, true) .localAddress(new InetSocketAddress(DEFAULT_PORT)); } 复制代码
在步骤 1.6 创建 HttpServerBind
对象时,已经预先创建了 TcpServerBind.INSTANCE
对象。
按照 OSI 的七层网络模型,端口是在传输层定义的,因此端口的绑定工作,应该有 TCP 层来实现。 所以这里首先是创建了一个 TcpServerBind
对象,用来负责这个事情。在该类中,有 ServerBootstrap
属性,可以用来串联 Netty 中的 EventLoopGroup、SocketChannel、Handler。一段典型的 Netty 启动代码如下:
EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(boss, worker) // EventLoopGroup .channel(NioServerSocketChannel.class) // SocketChannel .localAddress(new InetSocketAddress(port)) // ip+port .childHandler(new ChannelInitializer<SocketChannel>() { // ChannelHandler @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ... } }); ChannelFuture future = b.bind().sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); } 复制代码
可以看到,创建的 TcpServerBind.INSTANCE
中的 ServerBootstrap
属性缺了很多东西: EventLoopGroup、SocketChannel 以及 Handler。
回头继续看 bind 方法,根据之前的分析, 此时的 HttpServer 实际为 HttpServerHandler
类型,并且其 source 为多次嵌套的 HttpServerTcpConfig
类型,其最终 source 为 HttpServerBind
类型 。
/** HttpServer **/ public final Mono<? extends DisposableServer> bind() { return bind(tcpConfiguration()); // 2.5 返回的TcpServer是什么类型? } /** HttpServerHandler **/ protected TcpServer tcpConfiguration() { return source.tcpConfiguration().bootstrap(this); // 2.6 返回TcpServerBootstrap } /** HttpServerTcpConfig **/ protected TcpServer tcpConfiguration() { return Objects.requireNonNull(tcpServerMapper.apply(source.tcpConfiguration()), "tcpServerMapper"); // 2.7 返回TcpServerBootstrap或者TcpServerChannelGroup(1.11步骤) } /** HttpServerBind **/ protected TcpServer tcpConfiguration() { return tcpServer; } 复制代码
为了获取步骤 2.5 中的 TcpServer
,需要先执行 HttpServerHandler -> tcpConfiguration()
,又需要先多次执行 HttpServerTcpConfig -> tcpConfiguration()
;在步骤 2.7 中,发起了之前函数(步骤1.8,1.9,1.11等)的调用。
结论:步骤 2.5 返回的是 TcpServerBootstrap
类型,并且其 source 也是 TcpServerBootstrap
类型(步骤1.11中返回的是 TcpServerChannelGroup
类型,步骤1.8返回的重点分析),多次嵌套后,其最终 source 为 TcpServerBind
。
继续往下看:
/** HttpServerBind **/ public Mono<? extends DisposableServer> bind(TcpServer delegate) { return delegate.bootstrap(this) // 返回TcpServerBootstrap .bind() .map(CLEANUP_GLOBAL_RESOURCE); } /** TcpServer **/ public final TcpServer bootstrap(Function<? super ServerBootstrap, ? extends ServerBootstrap> bootstrapMapper) { return new TcpServerBootstrap(this, bootstrapMapper); } public final Mono<? extends DisposableServer> bind() { ServerBootstrap b; try{ b = configure(); // 2.8 } catch (Throwable t){ Exceptions.throwIfJvmFatal(t); return Mono.error(t); } return bind(b); // 2.12 } /** TcpServerBootstrap **/ public ServerBootstrap configure() { return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper"); // 2.9 } /** TcpServerChannelGroup **/ public ServerBootstrap configure() { // 2.10 ServerBootstrap b = source.configure(); b.attr(CHANNEL_GROUP, channelGroup); ConnectionObserver observer = BootstrapHandlers.childConnectionObserver(b); BootstrapHandlers.childConnectionObserver(b, observer.then(this)); return b; } /** TcpServerBind **/ public ServerBootstrap configure() { // 2.11 return this.serverBootstrap.clone(); } 复制代码
2.8、链式调用 2.9-2.11 中的 configure()
方法,主要是给 ServerBootstrap
配置了一些 childOptions;这里还设置了 group,查看之前 1.8 的源码:发现这里填充了 ServerBootstrap
的 group 和 channel 属性,只缺少 handler 了。
/** NettyReactiveWebServerFactory **/ server = server .tcpConfiguration((tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress)); /** TcpServer **/ public final TcpServer runOn(LoopResources channelResources) { return runOn(channelResources, LoopResources.DEFAULT_NATIVE); } public final TcpServer runOn(LoopResources channelResources, boolean preferNative) { return new TcpServerRunOn(this, channelResources, preferNative); } /** TcpServerRunOn **/ TcpServerRunOn(TcpServer server, LoopResources loopResources, boolean preferNative) { super(server); this.loopResources = Objects.requireNonNull(loopResources, "loopResources"); this.preferNative = preferNative; } public ServerBootstrap configure() { ServerBootstrap b = source.configure(); configure(b, preferNative, loopResources); return b; } static void configure(ServerBootstrap b, boolean preferNative, LoopResources resources) { EventLoopGroup selectorGroup = resources.onServerSelect(preferNative); EventLoopGroup elg = resources.onServer(preferNative); b.group(selectorGroup, elg) //熟悉的Netty代码 .channel(resources.onServerChannel(elg)); } 复制代码
2.9、 bootstrapMapper.apply(source.configure())
, source.configure()
执行完之后,就可以开始执行 HttpServer
的 apply()
方法了。
/** HttpServerBind **/ public ServerBootstrap apply(ServerBootstrap b) { HttpServerConfiguration conf = HttpServerConfiguration.getAndClean(b); // 2.12 SslProvider ssl = SslProvider.findSslSupport(b); // 2.13 if (ssl != null && ssl.getDefaultConfigurationType() == null) { if ((conf.protocols & HttpServerConfiguration.h2) == HttpServerConfiguration.h2) { ssl = SslProvider.updateDefaultConfiguration(ssl, SslProvider.DefaultConfigurationType.H2); SslProvider.setBootstrap(b, ssl); } else { ssl = SslProvider.updateDefaultConfiguration(ssl, SslProvider.DefaultConfigurationType.TCP); SslProvider.setBootstrap(b, ssl); } } if (b.config() .group() == null) { // 2.14 LoopResources loops = HttpResources.get(); EventLoopGroup selector = loops.onServerSelect(LoopResources.DEFAULT_NATIVE); EventLoopGroup elg = loops.onServer(LoopResources.DEFAULT_NATIVE); b.group(selector, elg) .channel(loops.onServerChannel(elg)); } //remove any OPS since we will initialize below BootstrapHandlers.channelOperationFactory(b); if (ssl != null) { if ((conf.protocols & HttpServerConfiguration.h2c) == HttpServerConfiguration.h2c) { // 2.15 throw new IllegalArgumentException("Configured H2 Clear-Text protocol " + "with TLS. Use the non clear-text h2 protocol via " + "HttpServer#protocol or disable TLS" + " via HttpServer#tcpConfiguration(tcp -> tcp.noSSL())"); } if ((conf.protocols & HttpServerConfiguration.h11orH2) == HttpServerConfiguration.h11orH2) { return BootstrapHandlers.updateConfiguration(b, NettyPipeline.HttpInitializer, new Http1OrH2Initializer(conf.decoder.maxInitialLineLength(), conf.decoder.maxHeaderSize(), conf.decoder.maxChunkSize(), conf.decoder.validateHeaders(), conf.decoder.initialBufferSize(), conf.minCompressionSize, compressPredicate(conf.compressPredicate, conf.minCompressionSize), conf.forwarded, conf.cookieEncoder, conf.cookieDecoder, conf.uriTagValue)); } if ((conf.protocols & HttpServerConfiguration.h11) == HttpServerConfiguration.h11) { return BootstrapHandlers.updateConfiguration(b, NettyPipeline.HttpInitializer, new Http1Initializer(conf.decoder.maxInitialLineLength(), conf.decoder.maxHeaderSize(), conf.decoder.maxChunkSize(), conf.decoder.validateHeaders(), conf.decoder.initialBufferSize(), conf.minCompressionSize, compressPredicate(conf.compressPredicate, conf.minCompressionSize), conf.forwarded, conf.cookieEncoder, conf.cookieDecoder, conf.uriTagValue)); } if ((conf.protocols & HttpServerConfiguration.h2) == HttpServerConfiguration.h2) { return BootstrapHandlers.updateConfiguration(b, NettyPipeline.HttpInitializer, new H2Initializer( conf.decoder.validateHeaders(), conf.minCompressionSize, compressPredicate(conf.compressPredicate, conf.minCompressionSize), conf.forwarded, conf.cookieEncoder, conf.cookieDecoder)); } } else { if ((conf.protocols & HttpServerConfiguration.h2) == HttpServerConfiguration.h2) { throw new IllegalArgumentException( "Configured H2 protocol without TLS. Use" + " a clear-text h2 protocol via HttpServer#protocol or configure TLS" + " via HttpServer#secure"); } if ((conf.protocols & HttpServerConfiguration.h11orH2c) == HttpServerConfiguration.h11orH2c) { return BootstrapHandlers.updateConfiguration(b, NettyPipeline.HttpInitializer, new Http1OrH2CleartextInitializer(conf.decoder.maxInitialLineLength(), conf.decoder.maxHeaderSize(), conf.decoder.maxChunkSize(), conf.decoder.validateHeaders(), conf.decoder.initialBufferSize(), conf.minCompressionSize, compressPredicate(conf.compressPredicate, conf.minCompressionSize), conf.forwarded, conf.cookieEncoder, conf.cookieDecoder, conf.uriTagValue, conf.decoder.h2cMaxContentLength)); } if ((conf.protocols & HttpServerConfiguration.h11) == HttpServerConfiguration.h11) { // 2.16 return BootstrapHandlers.updateConfiguration(b, NettyPipeline.HttpInitializer, new Http1Initializer(conf.decoder.maxInitialLineLength(), conf.decoder.maxHeaderSize(), conf.decoder.maxChunkSize(), conf.decoder.validateHeaders(), conf.decoder.initialBufferSize(), conf.minCompressionSize, compressPredicate(conf.compressPredicate, conf.minCompressionSize), conf.forwarded, conf.cookieEncoder, conf.cookieDecoder, conf.uriTagValue)); } if ((conf.protocols & HttpServerConfiguration.h2c) == HttpServerConfiguration.h2c) { return BootstrapHandlers.updateConfiguration(b, NettyPipeline.HttpInitializer, new H2CleartextInitializer( conf.decoder.validateHeaders(), conf.minCompressionSize, compressPredicate(conf.compressPredicate, conf.minCompressionSize), conf.forwarded, conf.cookieEncoder, conf.cookieDecoder)); } } throw new IllegalArgumentException("An unknown HttpServer#protocol " + "configuration has been provided: "+String.format("0x%x", conf .protocols)); } 复制代码
2.12、可以通过 ServerBootstrapConfig
来获取 ServerBootstrap
相关的信息,比如 childGroup
, childHanlder
, childOptions
, childAttrs
。这里把原有 ServerBootstrap 中的 attrs -> httpServerConf
属性清空了。
2.13、是否支持 ssl,如果支持,需要增加相关的 handler;
2.14、之前已经设置过了;
2.15、如果设置了 ssl,那么就不支持 h2c 协议。http 主要有 3 个协议:h11(http1.1)、h2(http2)、h2c(http2的明文版本);
2.16、根据协议类型,选择合适的 childHandler;比如 http1.1 明文,设置 Http1Initializer
。
至此,一个完整的 ServerBootstrap
生成了。接下来就可以进行 bind()
操作了。
/** TcpServerBind **/ public Mono<? extends DisposableServer> bind(ServerBootstrap b) { SslProvider ssl = SslProvider.findSslSupport(b); if (ssl != null && ssl.getDefaultConfigurationType() == null) { ssl = SslProvider.updateDefaultConfiguration(ssl, SslProvider.DefaultConfigurationType.TCP); SslProvider.setBootstrap(b, ssl); } if (b.config() .group() == null) { TcpServerRunOn.configure(b, LoopResources.DEFAULT_NATIVE, TcpResources.get()); } return Mono.create(sink -> { // 2.17 ServerBootstrap bootstrap = b.clone(); ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap); ConnectionObserver childObs = BootstrapHandlers.childConnectionObserver(bootstrap); ChannelOperations.OnSetup ops = BootstrapHandlers.channelOperationFactory(bootstrap); convertLazyLocalAddress(bootstrap); BootstrapHandlers.finalizeHandler(bootstrap, ops, new ChildObserver(childObs)); ChannelFuture f = bootstrap.bind(); // 正式绑定端口 DisposableBind disposableServer = new DisposableBind(sink, f, obs, bootstrap); f.addListener(disposableServer); sink.onCancel(disposableServer); }); } 复制代码
2.17、需要注意的是,返回的是 Mono 类型,需要订阅才能触发正式的绑定端口操作。
/** Mono **/ public T block(Duration timeout) { BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(); subscribe((Subscriber<T>) subscriber); return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS); } abstract class BlockingSingleSubscriber<T> extends CountDownLatch implements InnerConsumer<T>, Disposable { T value; public final void onSubscribe(Subscription s) { this.s = s; if (!cancelled) { s.request(Long.MAX_VALUE); // 发起请求,触发2.17执行 } } 复制代码
分层
TcpServer 负责 tcp 协议层的功能,比如绑定端口,比如设置 tcp 层的一些参数 TCP_NODELAY
;HttpServer 负责 http 协议的功能,比如编解码。
分模块
比如 TcpServerBind
负责端口绑定, TcpServerRunOn
负责绑定线程池。
同样的, HttpServer
也有类似的设定。
链式调用
通过对 TcpServer
以及 HttpServer
的嵌套包装,链式调用其 configure()
方法或者 tcpConfiguration()
方法。
方法入参为函数,由订阅触发。