本系列为本人Java编程方法论 响应式解读系列的 Webflux
部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址如下:
Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840
Reactor源码解读与分享:https://www.bilibili.com/video/av35326911
NIO源码解读相关视频分享: https://www.bilibili.com/video/av43230997
NIO源码解读视频相关配套文章:
BIO到NIO源码的一些事儿之BIO
BIO到NIO源码的一些事儿之NIO 上
BIO到NIO源码的一些事儿之NIO 中
BIO到NIO源码的一些事儿之NIO 下 之 Selector
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下
Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 上
Java编程方法论-Spring WebFlux篇 01 为什么需要Spring WebFlux 下
Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装
其中,Rxjava与Reactor作为本人书中内容将不对外开放,大家感兴趣可以花点时间来观看视频,本人对着两个库进行了全面彻底细致的解读,包括其中的设计理念和相关的方法论,也希望大家可以留言纠正我其中的错误。
本书主要针对 Netty
服务器来讲,所以读者应具备有关 Netty
的基本知识和应用技能。接下来,我们将对 Reactor-netty
从设计到实现的细节一一探究,让大家真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是 Reactor-netty 0.7.8.Release
这个版本,但现在已有 0.8
版本,而且 0.7
与 0.8
版本在源码细节有不小的变动,这点给大家提醒下。我会针对 0.8
版本进行全新的解读并在未来出版的书中进行展示。
这里,我们会首先解读 Reactor Netty
是如何针对 Netty
中 Bootstrap
的 ChildHandler
进行封装以及响应式拓展等一些细节探究。接着,我们会涉及到 HttpHandler
的引入,以此来对接我们上层web服务。
因为这是我们切入自定义逻辑的地方,所以,我们首先来关注下与其相关的 ChannelHandler
,以及前文并未提到的,服务器到底是如何启动以及如何通过响应式来做到优雅的关闭,首先我们会接触关闭服务器的设定。
我们再回到 reactor.ipc.netty.http.server.HttpServer#HttpServer
这个构造器中,由上一章我们知道请求是 HTTP
层面的(应用层),必须依赖于 TCP
的连接实现,所以这里就要有一个 TCPServer
的实现,其实就是 Channel
上 Pipeline
的操作。
//reactor.ipc.netty.http.server.HttpServer#HttpServer private HttpServer(HttpServer.Builder builder) { HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder(); ... this.options = serverOptionsBuilder.build(); this.server = new TcpBridgeServer(this.options); } 复制代码
这里的话在 DiscardServer Demo
中, TCPServer
我们主要针对 childHandler
的内容的封装,也就是如下内容:
b.group(bossGroup, workerGroup) ... .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) ... 复制代码
那 childHandler
到底代表什么类型,我们可以在 io.netty.bootstrap.ServerBootstrap
找到其相关定义:
//io.netty.bootstrap.ServerBootstrap public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>(); private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler; public ServerBootstrap() { } private ServerBootstrap(ServerBootstrap bootstrap) { super(bootstrap); childGroup = bootstrap.childGroup; childHandler = bootstrap.childHandler; synchronized (bootstrap.childOptions) { childOptions.putAll(bootstrap.childOptions); } synchronized (bootstrap.childAttrs) { childAttrs.putAll(bootstrap.childAttrs); } } ... } 复制代码
由字段定义可知, childHandler
代表的是 ChannelHandler
,顾名思义,是关于 Channel
的一个处理类,这里通过查看其定义可知它是用来拦截处理 Channel
中的 I/O
事件,并通过 Channel
下的 ChannelPipeline
将处理后的事件转发到其下一个处理程序中。
那这里如何实现 DiscardServer Demo
中的 b.childHandler(xxx)
行为,通过 DiscardServer Demo
我们可以知道,我们最关注的其实是 ch.pipeline().addLast(new DiscardServerHandler());
中的 DiscardServerHandler
实现,但是我们发现,这个核心语句是包含在 ChannelInitializer
内,其继承了 ChannelInboundHandlerAdapter
,它的最顶层的父类接口就是 ChannelHandler
,也就对应了 io.netty.bootstrap.ServerBootstrap
在执行 b.childHandler(xxx)
方法时,其需要传入 ChannelHandler
类型的设定。这里就可以分拆成两步来做,一个是 b.childHandler(xxx)
行为包装,一个是此 ChannelHandler
的定义拓展实现。
那么,为了 API
的通用性,我们先来看Netty的客户端的建立的一个Demo(摘自本人RPC项目的一段代码):
private Channel createNewConChannel() { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class) .group(new NioEventLoopGroup(1)) .handler(new ChannelInitializer<Channel>() { protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)) .addLast(new RpcDecoder(10 * 1024 * 1024)) .addLast(new RpcEncoder()) .addLast(new RpcClientHandler()) ; } }); try { final ChannelFuture f = bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.TCP_NODELAY, true) .connect(ip, port).sync(); // <1> f.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { LOGGER.info("Connect success {} ", f); } }); final Channel channel = f.channel(); channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port)); return channel; } catch (InterruptedException e) { e.printStackTrace(); } return null; } 复制代码
将 Netty
的客户端与服务端的建立进行对比,我们可以发现 b.childHandler(xxx)
与相应的启动( Server
端的话是 serverBootstrap.bind(port).sync();
,客户端的话是上述Demo中 <1>
处的内容)都可以抽取出来作为一个接口来进行功能的聚合,然后和相应的 Server
(如 TcpServer
)或 Client
(如 TcpClient
)进行其特有的实现。在 Reactor Netty
内的话,就是定义一个 reactor.ipc.netty.NettyConnector
接口,除了做到上述的功能之外,为了适配响应式的理念,也进行了响应式的设计。即在 netty
客户端与服务端在启动时,可以保存其状态,以及提供结束的对外接口方法,这种在响应式中可以很优雅的实现。接下来,我们来看此 reactor.ipc.netty.NettyConnector
的接口定义:
//reactor.ipc.netty.NettyConnector public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> { Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler); ... default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> BlockingNettyContext start(T handler) { return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName()); } } ... default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) { BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName()); facade.installShutdownHook(); if (onStart != null) { onStart.accept(facade); } facade.getContext() .onClose() .block(); } 复制代码
其中, newHandler
可以是我们上层web处理,里面包含了 INBOUND, OUTBOUND
,具体的话就是 request,response
,后面会专门来涉及到这点。
接着就是提供了一个启动方法 start
,其内创建了一个 BlockingNettyContext
实例,而逻辑的核心就在其构造方法内,就是要将配置好的服务器启动,整个启动过程还是放在 newHandler(handler)
中,其返回的 Mono<? extends NettyContext>
中的 NettyContext
类型元素是管理 io.netty.channel.Channel
上下文信息的一个对象,这个对象更多的是一些无状态的操作,并不会对此对象做什么样的改变,也是通过对此对象的一个 Mono<? extends NettyContext>
包装然后通过 block
产生订阅,来做到 sync()
的效果,通过,通过 block
产生订阅后返回的 NettyContext
对象,可以使中断关闭服务器的操作也可以做到更优雅:
public class BlockingNettyContext { private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class); private final NettyContext context; private final String description; private Duration lifecycleTimeout; private Thread shutdownHook; public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description) { this(contextAsync, description, Duration.ofSeconds(45)); } public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description, Duration lifecycleTimeout) { this.description = description; this.lifecycleTimeout = lifecycleTimeout; this.context = contextAsync .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms"))) .doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address())) .block(); } ... /** * Shut down the {@link NettyContext} and wait for its termination, up to the * {@link #setLifecycleTimeout(Duration) lifecycle timeout}. */ public void shutdown() { if (context.isDisposed()) { return; } removeShutdownHook(); //only applies if not called from the hook's thread context.dispose(); context.onClose() .doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e)) .doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address())) .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms"))) .block(); } ... } 复制代码
这里,我们来接触下在Reactor中并没有深入接触的 blockXXX()
操作,其实整个逻辑还是比较简单的,这里拿 reactor.core.publisher.Mono#block()
来讲,就是获取并返回这个下发的元素:
//reactor.core.publisher.Mono#block() @Nullable public T block() { BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(); onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber)); return subscriber.blockingGet(); } //reactor.core.publisher.BlockingMonoSubscriber final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> { @Override public void onNext(T t) { if (value == null) { value = t; countDown(); } } @Override public void onError(Throwable t) { if (value == null) { error = t; } countDown(); } } //reactor.core.publisher.BlockingSingleSubscriber abstract class BlockingSingleSubscriber<T> extends CountDownLatch implements InnerConsumer<T>, Disposable { T value; Throwable error; Subscription s; volatile boolean cancelled; BlockingSingleSubscriber() { super(1); } ... @Nullable final T blockingGet() { if (Schedulers.isInNonBlockingThread()) { throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName()); } if (getCount() != 0) { try { await(); } catch (InterruptedException ex) { dispose(); throw Exceptions.propagate(ex); } } Throwable e = error; if (e != null) { RuntimeException re = Exceptions.propagate(e); //this is ok, as re is always a new non-singleton instance re.addSuppressed(new Exception("#block terminated with an error")); throw re; } return value; } ... @Override public final void onComplete() { countDown(); } } 复制代码
可以看到,此处使用的 CountDownLatch
的一个特性,在元素下发赋值之后,等待数值减1,这里刚好也就这一个限定(由 super(1)
定义),解除所调用的 blockingGet
中的等待,得到所需的值,这里,为了保证 block()
的语义,其 onComplete
方法也调用了 countDown();
,即当上游为 Mono<Void>
时,做到匹配。