欢迎关注公众号:【 爱编程 】
如果有需要后台回复 2019 赠送 1T的学习资料 哦!!
前文再续,书接上一回【 NioEventLoop 】。
在研究NioEventLoop执行过程的时候,检测IO事件(包括新连接),处理IO事件,执行所有任务三个过程。其中检测IO事件中通过持有的selector去轮询事件,检测出新连接。这里复用同一段代码。
在开始分析前,先了解一下Channel的设计
顶层Channel接口定义了socket事件如读、写、连接、绑定等事件,并使用AbstractChannel作为骨架实现了这些方法。查看器成员变量,发现大多数通用的组件,都被定义在这里
第二层AbstractNioChannel定义了以NIO,即Selector的方式进行读写事件的监听。其成员变量保存了selector相关的一些属性。
第三层内容比较多,定义了服务端channel(左边继承了AbstractNioMessageChannel的NioServerSocketChannel)以及客户端channel(右边继承了AbstractNioByteChannel的NioSocketChannel)。
本文开始探索一下Netty是如何接入新连接?主要分为四个部分
1.检测新连接
2.创建NioSocketChannel
3.分配线程和注册Selector
4.向Selector注册读事件
Netty服务端在启动的时候会绑定一个bossGroup,即NioEventLoop,在 bind()
绑定端口的时候注册accept(新连接接入)事件。扫描到该事件后,便处理。因此入口从: NioEventLoop#processSelectedKeys()
开始。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //省略代码 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop //如果当前NioEventLoop是workGroup 则可能是OP_READ,bossGroup是OP_ACCEPT if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //新连接接入以及读事件处理入口 unsafe.read(); } }
关键的新连接接入以及读事件处理入口 unsafe.read();
a).这里的 unsafe
是在Channel创建过程的时候,调用了父类 AbstractChannel#AbstractChannel()
的构造方法,和 pipeline
一起初始化的。
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
服务端:
unsafe 为 NioServerSockeChanne
l的父类AbstractNioMessageChannel#newUnsafe()创建,可以看到对应的是AbstractNioMessageChannel的内部类 NioMessageUnsafe
;
客户端:
unsafe为 NioSocketChannel
的的父类AbstractNioUnsafe#newUnsafe()创建的话,它对应的是AbstractNioByteChannel的内部类 NioByteUnsafe
b).unsafe.read()
NioMessageUnsafe.read()
中主要的操作如下:
1.循环调用jdk底层的代码创建channel,并用netty的NioSocketChannel包装起来,代表新连接成功接入一个通道。
2.将所有获取到的channel存储到一个容器当中,检测接入的连接数,默认是一次接16个连接
3.遍历容器中的channel,依次调用方法fireChannelRead,4.fireChannelReadComplete,fireExceptionCaught来触发对应的传播事件。
private final class NioMessageUnsafe extends AbstractNioUnsafe { //临时存储读到的连接 private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //服务端接入速率处理器 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { //while循环调用doReadMessages()创建新连接对象 do { //获取jdk底层的channel,并加入readBuf容器 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //把读到的连接做一个累加totalMessages,默认最多累计读取16个连接,结束循环 allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } //触发readBuf容器内所有的传播事件:ChannelRead 读事件 int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } //清空容器 readBuf.clear(); allocHandle.readComplete(); //触发传播事件:ChannelReadComplete,所有的读事件完成 pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); //触发传播事件:exceptionCaught,触发异常 pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
而这一段关键代码逻辑中 int localRead = doReadMessages(readBuf);
它创建jdk底层channel并且用NioSocketChannel包装起来,将该channel添加到传入的容器保存起来,同时返回一个计数。
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //将jdk底层的channel封装到netty的channel,并存储到传入的容器当中 //this为服务端channel buf.add(new NioSocketChannel(this, ch)); //成功和创建 客户端接入的一条通道,并返回 return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
通过检测IO事件轮询新连接,当前成功检测到连接接入事件之后,会调用 NioServerSocketChannel#doReadMessages()
方法,进行创建 NioSocketChannel
,即客户端channel的过程。
下面就来了解一下 NioSocketChannel 的主要工作:
.查看原代码做了两件事,调用父类构造方法,实例化一个NioSocketChannelConfig。
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); //实例化一个NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()); }
1)、查看NioSocketChannel父类构造方法,主要是 保存客户端注册的读事件、channel为成员变量,以及设置阻塞模式为非阻塞。
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); //实例化一个NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()); } protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { //传入感兴趣的读事件:客户端channel的读事件 super(parent, ch, SelectionKey.OP_READ); } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //保存客户端channel为成员变量 this.ch = ch; //保存感兴趣的读事件为成员变量 this.readInterestOp = readInterestOp; try { //配置阻塞模式为非阻塞 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
最后调用父类的构造方法,是设置 该客户端channel对应的服务端channel,以及channel的id和两大组件unsafe和pipeline
protected AbstractChannel(Channel parent) { //parent为创建次客户端channel的服务端channel(服务端启动过程中通过反射创建的) this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
2)、再看NioSocketChannelConfig实例化。主要是保存了javaSocket,并且通过 setTcpNoDelay(true);
禁止了tcp的Nagle算法,目的是为了尽量让小的数据包整合成大的发送出去,降低延时.
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); calculateMaxBytesPerGatheringWrite(); } public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } //保存socket this.javaSocket = javaSocket; // Enable TCP_NODELAY by default if possible. if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { //禁止Nagle算法,目的是为了让小的数据包尽量集合成大的数据包发送出去 setTcpNoDelay(true); } catch (Exception e) { // Ignore. } } }
服务端启动初始化的时候 ServerBootstrap#init()
,主要做了一些参数的配置。其中对于 childGroup,childOptions,childAttrs,childHandler
等参数被进行了单独配置。作为参数和 ServerBootstrapAcceptor
一起,被当作一个特殊的handle,封装到pipeline中。 ServerBootstrapAcceptor
中的 eventLoop
为 workGroup
。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { //省略了很多代码............. @Override void init(Channel channel) throws Exception { //配置AbstractBootstrap.option final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } //配置AbstractBootstrap.attr final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //配置pipeline ChannelPipeline p = channel.pipeline(); //获取ServerBootstrapAcceptor配置参数 final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); //配置AbstractBootstrap.handler ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { //配置ServerBootstrapAcceptor,作为Handle紧跟HeadContext pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } //省略了很多代码............. }
可见,整个服务端pipeline的结构如下图所示。 bossGroup
控制IO事件的检测与处理,整个 bossGroup
对应的pipeline只包括头( HeadContext
)尾( TailContext
)以及中部的 ServerBootstrap.ServerBootstrapAcceptor
。
当新连接接入的时候 AbstractNioMessageChannel.NioMessageUnsafe#read()
方法被调用,最终调用 fireChannelRead()
,方法来触发下一个Handler的 channelRead
方法。而这个Handler正是 ServerBootstrapAcceptor
它是ServerBootstrap的内部类,同时继承自 ChannelInboundHandlerAdapter
。也是一个 ChannelInboundHandler
。其中channelRead主要做了以下几件事。
1.为客户端channel的pipeline添加childHandler
2.设置客户端TCP相关属性childOptions和自定义属性childAttrs
3.workGroup选择NioEventLoop并注册Selector
1)、为客户端channel的pipeline添加childHandler
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; private final Runnable enableAutoReadTask; ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; //省略了一些代码。。。。。 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { //该channel为客户端接入时创建的channel final Channel child = (Channel) msg; //添加childHandler child.pipeline().addLast(childHandler); //设置TCP相关属性:childOptions setChannelOptions(child, childOptions, logger); //设置自定义属性:childAttrs for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //选择NioEventLoop并注册Selector childGroup.register(child) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } //省略了一些代码。。。。。 }
客户端 channel
的pipeline添加 childHandler
,在服务端EchoServer创建流程中,childHandler的时候,使用了 ChannelInitializer
的一个自定义实例。并且覆盖了其 initChannel
方法,改方法获取到pipeline并添加具体的Handler。查看 ChannelInitializer
具体的添加逻辑, handlerAdded
方法。其实在 initChannel
逻辑中,首先是 回调到用户代码执行 initChannel
,用户代码执行添加Handler的添加操作,之后将ChannelInitializer自己从pipeline中删除 。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. //初始化Channel if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { //回调到用户代码 initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { //删除本身 pipeline.remove(this); } } return true; } return false; } }
2)、设置客户端TCP相关属性childOptions和自定义属性childAttrs
这点在 ServerBootstrapAcceptor#init()
方法中已经体现
3)、workGroup选择NioEventLoop并注册Selector
这要从 AbstractBootstrap#initAndRegister()
方法开始,然后跟踪源码会来到 AbstractUnsafe#register()
方法
protected abstract class AbstractUnsafe implements Unsafe { //省略了一些代码。。。。。 @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } //省略了一些代码。。。。。 }
最后调用 AbstractNioUnsafe#doRegister()
方法通过jdk的 javaChannel().register
完成注册功能。
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { //省略了一些代码。。。。。 @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } //省略了一些代码。。。。。 }
a)、入口: ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register()
;
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
b)、实际上调用了 AbstractChannel.AbstractUnsafe#register0()
,触发了通道激活事件;
//触发通道激活事件,调用HeadContent的 pipeline.fireChannelActive();
c)、 pipeline
的头部开始,即 DefaultChannelPipeline.HeadContext#channelActive()
从而触发了 readIfIsAutoRead()
;
@Override public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
d)、读事件将从尾部的TailContent#read()被触发,从而依次执行ctx.read(),从尾部开始,每个outboundHandler的read()事件都被触发。直到头部。
@Override public final ChannelPipeline read() { tail.read(); return this; } @Override public ChannelHandlerContext read() { //获取最近的outboundhandler final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); //并依次执行其read方法 if (executor.inEventLoop()) { next.invokeRead(); } else { Tasks tasks = next.invokeTasks; if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } executor.execute(tasks.invokeReadTask); } return this; }
e)、进入头部HeadContext#read(),并且最终更改了selectionKey,向selector注册了读事件
HeadContext#read()
@Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
AbstractChannel#beginRead()
@Override public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
AbstractNioMessageChannel#doBeginRead
@Override protected void doBeginRead() throws Exception { if (inputShutdown) { return; } super.doBeginRead(); }
AbstractNioChannel#doBeginRead()
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
参考文章:
JorgezhongNetty如何接入新连接基本流程如上所述,如果有误,还望各位指正。建议先从前两篇看起比较好理解点。
【Netty】服务端和客户端
学习NioEventLoop如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。
关注公众号 【爱编码】 ,回复 2019 有相关资料哦。