本系列为本人Java编程方法论 响应式解读系列的 Webflux
部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址如下:
Rxjava源码解读与分享: www.bilibili.com/video/av345…
Reactor源码解读与分享: www.bilibili.com/video/av353…
NIO源码解读相关视频分享: www.bilibili.com/video/av432…
NIO源码解读视频相关配套文章:
BIO到NIO源码的一些事儿之BIO
BIO到NIO源码的一些事儿之NIO 上
BIO到NIO源码的一些事儿之NIO 中
BIO到NIO源码的一些事儿之NIO 下 之 Selector
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下
Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上
Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下
其中,Rxjava与Reactor作为本人书中内容将不对外开放,大家感兴趣可以花点时间来观看视频,本人对着两个库进行了全面彻底细致的解读,包括其中的设计理念和相关的方法论,也希望大家可以留言纠正我其中的错误。
本书主要针对 Netty
服务器来讲,所以读者应具备有关 Netty
的基本知识和应用技能。接下来,我们将对 Reactor-netty
从设计到实现的细节一一探究,让大家真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是 Reactor-netty 0.7.8.Release
这个版本,但现在已有 0.8
版本,而且 0.7
与 0.8
版本在源码细节有不小的变动,这点给大家提醒下。我会针对 0.8
版本进行全新的解读。
我们由上一章可知Tomcat使用 Connector
来接收和响应连接请求,这里,对于 Netty
来讲,如果我们想让其做为一个 web
服务器,我们先来看一个 Netty
常见的一个用法(这里摘自官方文档一个例子 DiscardServer Demo
):
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 丢弃任何进入的数据 */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // 绑定端口,开始接收进来的连接 ChannelFuture f = b.bind(port).sync(); // (7) // 等待服务器 socket 关闭 。 // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } } 复制代码
NioEventLoopGroup
是用来处理 I/O
操作的多线程事件循环器, Netty
提供了许多不同的 EventLoopGroup
的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup
会被使用。第一个经常被叫做 BossGroup
,用来接收进来的连接。第二个经常被叫做 WorkerGroup
,用来处理已经被接收的连接,一旦 BossGroup
接收到连接,就会把连接信息注册到 WorkerGroup
上。如何知道多少个线程已经被使用,如何映射到已经创建的 Channel
上都需要依赖于 EventLoopGroup
的实现,并且可以通过构造函数来配置他们的关系。 ServerBootstrap
是一个启动 NIO
服务的辅助启动类。你可以在这个服务中直接使用 Channel
,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。 NioServerSocketChannel
来举例说明一个新的 Channel
如何接收传进来的连接。 Channel
。 ChannelInitializer
是一个特殊的处理类,目的是帮助使用者配置一个新的 Channel
。 使用其对应的 ChannelPipeline
来加入你的服务逻辑处理(这里是 DiscardServerHandler
)。当你的程序变的复杂时,可能你会增加更多的处理类到 pipline
上,然后提取这些匿名类到最顶层的类上(匿名类即 ChannelInitializer
实例我们可以将其看成是一个代理模式的设计,类似于 Reactor
中 Subscriber
的设计实现,一层又一层的包装,最后得到一个我们需要的一个可以层层处理的 Subscriber
)。 Channel
实现的配置参数。如果我们写一个 TCP/IP
的服务端,我们可以设置 socket
的参数选项,如 tcpNoDelay
和 keepAlive
。请参考 ChannelOption
和 ChannelConfig
实现的接口文档来对 ChannelOption
的有一个大概的认识。 option()
和 childOption()
: option()
是提供给 NioServerSocketChannel
用来接收进来的连接。 childOption()
是提供给由父管道 ServerChannel
接收到的连接,在这个例子中也是 NioServerSocketChannel
。 8080
端口。当然现在你可以多次调用 bind()
方法(基于不同绑定地址)。
在看了常见的 Netty
的一个服务器创建用法之后,我们来看 Reactor Netty
给我们提供的Http服务器的一个封装: reactor.ipc.netty.http.server.HttpServer
。由上面 DiscardServer Demo
可知,首先是定义一个服务器,方便设定一些条件对其进行配置,然后启动的话是调用其 run
方法启动,为做到更好的可配置性,这里使用了建造器模式,以便我们自定义或直接使用默认配置(有些是必须配置,否则会抛出异常,这也是我们这里面所设定的内容之一):
//reactor.ipc.netty.http.server.HttpServer.Builder public static final class Builder { private String bindAddress = null; private int port = 8080; private Supplier<InetSocketAddress> listenAddress = () -> new InetSocketAddress(NetUtil.LOCALHOST, port); private Consumer<? super HttpServerOptions.Builder> options; private Builder() { } ... public final Builder port(int port) { this.port = port; return this; } /** * The options for the server, including bind address and port. * * @param options the options for the server, including bind address and port. * @return {@code this} */ public final Builder options(Consumer<? super HttpServerOptions.Builder> options) { this.options = Objects.requireNonNull(options, "options"); return this; } public HttpServer build() { return new HttpServer(this); } } 复制代码
可以看到,此处的 HttpServer.Builder#options
是一个函数式动作 Consumer
,其传入的参数是 HttpServerOptions.Builder
,在 HttpServerOptions.Builder
内可以针对我们在 DiscardServer Demo
中的 bootstrap.option
进行一系列的默认配置或者自行调控配置,我们的对于 option
的自定义设置主要还是针对于 ServerBootstrap#childOption
。因为在 reactor.ipc.netty.options.ServerOptions.Builder#option
这个方法中,有对它的父类 reactor.ipc.netty.options.NettyOptions.Builder#option
进行了相应的重写:
//reactor.ipc.netty.options.ServerOptions.Builder public static class Builder<BUILDER extends Builder<BUILDER>> extends NettyOptions.Builder<ServerBootstrap, ServerOptions, BUILDER>{...} //reactor.ipc.netty.options.ServerOptions.Builder#option /** * Set a {@link ChannelOption} value for low level connection settings like * SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote * peer. * * @param key the option key * @param <T> the option type * @return {@code this} * @see ServerBootstrap#childOption(ChannelOption, Object) */ @Override public final <T> BUILDER option(ChannelOption<T> key, T value) { this.bootstrapTemplate.childOption(key, value); return get(); } //reactor.ipc.netty.options.NettyOptions.Builder#option /** * Set a {@link ChannelOption} value for low level connection settings like * SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote * peer. * * @param key the option key * @param value the option value * @param <T> the option type * @return {@code this} * @see Bootstrap#option(ChannelOption, Object) */ public <T> BUILDER option(ChannelOption<T> key, T value) { this.bootstrapTemplate.option(key, value); return get(); } 复制代码
这是我们需要注意的地方。然后,我们再回到 reactor.ipc.netty.http.server.HttpServer.Builder
,从其 build
这个方法可知,其返回一个 HttpServer
实例,通过对所传入的 HttpServer.Builder
实例的 options
进行判断,接着,就是对 bootstrap.group
的判断,因为要使用构造器配置的话,首先得获取到 ServerBootstrap
,所以要先判断是否有可用 EventLoopGroup
,这个我们是可以自行设定的,这里设定一次, bossGroup
和 workerGroup
可能都会调用这一个,这点要注意下( loopResources
源码注释已经讲的很明确了):
//reactor.ipc.netty.http.server.HttpServer.Builder#build public HttpServer build() { return new HttpServer(this); } //reactor.ipc.netty.http.server.HttpServer#HttpServer private HttpServer(HttpServer.Builder builder) { HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder(); if (Objects.isNull(builder.options)) { if (Objects.isNull(builder.bindAddress)) { serverOptionsBuilder.listenAddress(builder.listenAddress.get()); } else { serverOptionsBuilder.host(builder.bindAddress).port(builder.port); } } else { builder.options.accept(serverOptionsBuilder); } if (!serverOptionsBuilder.isLoopAvailable()) { serverOptionsBuilder.loopResources(HttpResources.get()); } this.options = serverOptionsBuilder.build(); this.server = new TcpBridgeServer(this.options); } //reactor.ipc.netty.options.NettyOptions.Builder public static abstract class Builder<BOOTSTRAP extends AbstractBootstrap<BOOTSTRAP, ?>, SO extends NettyOptions<BOOTSTRAP, SO>, BUILDER extends Builder<BOOTSTRAP, SO, BUILDER>> implements Supplier<BUILDER> { ... /** * Provide a shared {@link EventLoopGroup} each Connector handler. * * @param eventLoopGroup an eventLoopGroup to share * @return {@code this} */ public final BUILDER eventLoopGroup(EventLoopGroup eventLoopGroup) { Objects.requireNonNull(eventLoopGroup, "eventLoopGroup"); return loopResources(preferNative -> eventLoopGroup); } /** * Provide an {@link EventLoopGroup} supplier. * Note that server might call it twice for both their selection and io loops. * * @param channelResources a selector accepting native runtime expectation and * returning an eventLoopGroup * @return {@code this} */ public final BUILDER loopResources(LoopResources channelResources) { this.loopResources = Objects.requireNonNull(channelResources, "loopResources"); return get(); } public final boolean isLoopAvailable() { return this.loopResources != null; } ... } 复制代码
可以看到,这个类是 Supplier
实现,其是一个对象提取器,即属于一个函数式动作对象,适合用于懒加载的场景。这里的 LoopResources
也是一个函数式接口( @FunctionalInterface
),其设计的初衷就是为 io.netty.channel.Channel
的工厂方法服务的:
//reactor.ipc.netty.resources.LoopResources @FunctionalInterface public interface LoopResources extends Disposable { /** * Default worker thread count, fallback to available processor */ int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty( "reactor.ipc.netty.workerCount", "" + Math.max(Runtime.getRuntime() .availableProcessors(), 4))); /** * Default selector thread count, fallback to -1 (no selector thread) */ int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty( "reactor.ipc.netty.selectCount", "" + -1)); /** * Create a simple {@link LoopResources} to provide automatically for {@link * EventLoopGroup} and {@link Channel} factories * * @param prefix the event loop thread name prefix * * @return a new {@link LoopResources} to provide automatically for {@link * EventLoopGroup} and {@link Channel} factories */ static LoopResources create(String prefix) { return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT, DEFAULT_IO_WORKER_COUNT, true); } static LoopResources create(String prefix, int selectCount, int workerCount, boolean daemon) { ... return new DefaultLoopResources(prefix, selectCount, workerCount, daemon); } ... /** * Callback for server {@link EventLoopGroup} creation. * * @param useNative should use native group if current {@link #preferNative()} is also * true * * @return a new {@link EventLoopGroup} */ EventLoopGroup onServer(boolean useNative); ... } 复制代码
我们在自定义的时候,可以借助此类的静态方法 create
方法来快速创建一个 LoopResources
实例。另外通过 LoopResources
的函数式特性,可以做到懒加载(将我们想要实现的业务藏到一个方法内),即,只有在使用的时候才会生成所需要的对象实例,即在使用 reactor.ipc.netty.options.NettyOptions.Builder#loopResources(LoopResources channelResources)
方法时,可进行 loopResources(true -> new NioEventLoopGroup())
,即在拿到 LoopResources
实例后,只有调用其 onServer
方法,才能拿到 EventLoopGroup
。这样就可以大大节省内存资源,提高性能。