nio.ServerSocketChannel ch = nio.SelectorProvider.openServerSocketChannel(); ch.configBlocking(false);
SelectionKey selectionKey = ch.register(eventLoop().selector, 0, this);
pipeline.fireChannelRegistered();
会导致ChannelInitialzer.channelRegisted()执行,将ServerBootstrapAcceptor添加到channel的pipeline中。
javaChannel().socket().bind(localAddress, config.getBacklog());
pipeline.fireChannelActive();
ChannelActive事件由HeadContext处理,向ch中添加OP_ACCEPT事件监听。
selectionKey.interestOps(OP_ACCEPT);
ServerBootstrap b = new ServerBootstrap(); // Start the server. ChannelFuture f = b.bind(port).sync();
服务端启动时都会执行上面的代码,用来启动ServerSocketChannel监听对应端口。
最终由AbstractBootstrap.doBind方法处理。
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
final Channel channel = channelFactory.newChannel();
从此行代码开始,ReflectiveChannelFactory通过反射的方式调用NioServerSocketChannel的构造方法进行channel的初始化。
newSocket(SelectorProvider provider);
使用nio.SelectorProvider.openServerSocketChannel()创建一个nio.ServerSocketChannel ch。
parent = null unsafe = new NioMessageUnsafe() pipeline = new DefaultChannelPipeline(this) ->此处初始化一个DefaultChannelPipeline,并将pipeline和channel互相绑定 ch = ch ->同时将ch设置为非阻塞模式 readInterestOp = OP_ACCEPT config = new ServerSocketChannelConfig(this, javaChannel().socket())
init(channel);
由其父类ServerBootstrap直接实现。
p.addLast(new ChannelInitializer<Channel>() { public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }
该处理器的 channelRegisted事件 回调方法会将 ServerBootstrapAcceptor 加入channel的pipeline中,并将自己从pipeline中移除。
ServerBootstrapAcceptor也是一个Inbound处理器,用于在Server端accept新的客户端连接时,向新生成的socketChannel中添加用户定义的业务处理器。
其channelRead事件回调方法会将业务方往ServerBootstrap中添加的childHandler添加到socketChannel对应的pipeline中。
对于Server端,channelRead事件被定义为server端accept到了新的socket连接。
ChannelFuture regFuture = group().register(channel);
1)NioEventLoopGroup的chooser从其children中选出一个NioEventLoop child,调用其register()方法进行channel注册;
2)实际将NioEventLoop child传给NioServerSocketChannel的unsafe,调用其register(EventLoop eventLoop, final ChannelPromise promise)方法完成注册
将channel.eventLoop绑定为当前NioEventLoop child;
将AbstractUnsafe.register0(DefaultChannelPromise promise)任务加入EventLoop child的执行任务队列;
doBind0(regFuture, channel, localAddress, promise);
当ChannelFuture完成时,将NioServerSocketChannel.bind(SocketAddress localAddress, ChannelPromise promise)任务加入EventLoop child的执行任务队列
AbstractUnsafe.register0(ChannelPromise promise);
1、将NioServerSocketChannel的ch注册到NioEventLoop child的selector上,同时将注册得到的SelectionKey绑定为NioServerSocketChannel的selectionKey
doRegister();
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
netty的轮询注册机制其实是将AbstractNioChannel内部的jdk类对象SelectableChannel ch注册到jdk类对象Selector selector上去,并且将AbstractNioChannel channel作为SelectableChannel对象ch的一个attachment附属上,这样在jdk轮询出某个SelectableChannel有IO事件发生时,就可以直接取出AbstractNioChannel进行后续操作。
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
会导致ChannelInitialzer.channelRegisted()执行,将ServerBootstrapAcceptor添加到channel的pipeline中。
AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise);
javaChannel().socket().bind(localAddress, config.getBacklog());
pipeline.fireChannelActive();
ChannelActive事件由HeadContext处理,最终调用了AbstractNioUnsafe.doBeginRead()方法,向ch中添加OP_READ事件监听。
selectionKey.interestOps(interestOps | readInterestOp);