Netty
服务端一般如下面代码模式,简化了 NIO
编程的复杂性同时,并且借助于 Pipeline
模型,可以很简单的就构建出高性能、可扩展的应用程序。
public class DemoServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
下面我们就通过源码分析下隐藏在这些代码背后的逻辑,对 Netty
可以有个更加深刻的认识。
Netty
中每个 Channel
都会通过注册方式,绑定到一个具体的 NioEventLoop
实例上, NioEventLoop
继承抽象类 SingleThreadEventLoop
,内部通过单个线程模式管理所有注册到它上面的 Channel
,负责这些 Channel
事件监听、事件处理等。 NioEventLoopGroup
内部包含一组 NioEventLoop
,好比 NioEventLoop
是用于管理 Channel
的其中一个线程,而 NioEventLoopGroup
则对应的是管理所有 Channel
的线程池。
上面创建了两个 NioEventLoopGroup
对象,一个是用来管理 NioServerSocketChannel
的,而另一个是用来管理客户端连接进来时创建的客户端对应的 NioSocketChannel
的。
通过跟踪 NioEventLoopGroup
构造过程,本身逻辑是比较简单,但是调用栈比较深,这里就不太方便代码展示,其大概完成事情可以用如下图描述:
NioEventLoopGroup
创建时,同时会创建三个元素: executor
、 chooser
和 child
:
child NioEventLoop Group NioEventLoop CPU核数*2 NioEventLoop
executor NioEventLoop executor
taskQueue NioEventLoop NioEventLoop taskQueue NioEventLoop taskQueue
rejectedHandler addTask() taskQueue
Selector NioEventLoop Selector Channel Selector SelectionKey NioEventLoop
executor NioEventLoop Channel executor
chooser NioEventLoopGroup NioEventLoop Channel chooser NioEventLoop
下面我们来分析下如下代码作用:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ServerBootStrap
是 Netty
使用的一个启动引导类,上面的代码主要是为后续 Netty
启动提供配置数据,本身比较简单:
group(bossGroup, workerGroup) Netty bossGroup NioServerSocketChannel OP_ACCEPT workerGroup NioSocketChannel
channel(NioServerSocketChannel.class)
:用于指定网络模型。 childHandler(new TestServerInitializer()) NioSocketChannel channel pipeline channel ChannelInitializer handler pipeline handler
当执行到 serverBootstrap.bind(8899)
,则表示 Netty
开始进入真正的启动阶段。一路跟踪下来,会进入到 doBind()
方法中:
private ChannelFuture doBind(final SocketAddress localAddress) {
//创建NioServerSocketChannel -> pipeline添加ServerBootstrapAcceptor -> channel进行register,分配NioEventLoop
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
/**
* register完成,则执行doBind0()进行server端口绑定
*/
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//register还未完成,则添加listener,待注册完成再执行doBind0()进行server端口绑定
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
这个方法主要完成两件事:
initAndRegister() Channel Channel Channel NioEventLoop NioEventLoop
doBind0() initAndRegister() doBind0() initAndRegister() doBind0() regFuture.isDone() Channel addListener() listener doBind0() channel channel
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
*
* 通过反射方式创建IO模型类型,具体类型有serverBootstrap.channel()方法指定,比如:NioServerSocketChannel
* NioServerSocketChannel创建时,构造方法中会触发创建jdk channel创建
* 同时会创建对应的配置类:NioServerSocketChannelConfig(tcp参数配置)
*/
channel = channelFactory.newChannel();
/**
* 初始化channel,由子类bootstrap或者serverBootStrap进行实现,可视为一个模板方法
* ServerBootStrap逻辑:options、attrs等初始化,同时向pipeline中添加一个InboundHandler:ServerBootstrapAcceptor
*
* new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)
* 这样NioServerSocketChannel接收到OP_ACCEPT事件时,就可以利用这些参数给代表客户端连接的SocketChannel初始化
*/
init(channel);//
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
//还没有注册到线程池。使用默认线程GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
/**
* 将NioServerSocketChannel注册到Reactor主线程池上 ,即给当前创建的Channel分配一个NioEventLoop线程
*/
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
这个方法主要完成3件事:
channelFactory.newChannel() Channel serverBootstrap.channel() .channel(NioServerSocketChannel.class)
init(channel) Channel options attrs pipeline handler ServerBootstrapAcceptor
new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)
ServerBootstrapAcceptor
连接处理器是 Server
端非常重要的一个 InBound
类型的 handler
,当 NioServerSocketChannel
轮询 OP_ACCEPT
事件接收到客户端连接进来时,客户端连接各种设置等工作就是由这个 Acceptor
连接器完成。
config().group().register(channel) Channel NioEventLoop NioEventLoop channel NioEventLoop Selector
newChannel()
比较简单,这里就不展开了,核心点主要在于 init(channel)
和 register(channel)
这两个方法。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
/**
* 给NIOServerChannel绑定的pipeline添加一个ChannelInitializer
*/
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这个方法主要完成2件事:
NioServerSocketChannel option attr
NioServerSocketChannel Pipeline ChannelInitializer ChannelInitializer pipeline handler NioServerSocketChannel handler ServerBootstrapAcceptor
顺便我们来看下通过 pipeline.addLast()
方式向 pipeline
添加 handler
逻辑:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
//当前Channel还未注册,需要先封装成PendingHandlerAddedTask,并链表方式挂载到Pipeline的pendingHandlerCallbackHead变量下,待后续注册完成后再回调
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//注册完成,且当前线程和Channel绑定线程不是同一个,则用Channel的绑定线程执行
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
pipeline
是一个双向链表,刚创建完成时默认有两个节点: head
和 tail
,如下图:
执行 p.addLast(new ChannelInitializer())
后是如下图:
如上面代码, pipeline
不是直接将 handler
添加进来,而是封装成 handlerContext
。执行 addLast0(newCtx)
将 handler
对应的 HandlerContext
添加进来后,正常情况下这时需要回调 handler#handlerAdded()
方法。 handler#handlerAdded()
执行是需要在 channel
注册的 NioEventLoop
线程中执行才行,所以有 if (!executor.inEventLoop())
这个判断。但是,当前是在主线程 main
中,且 channel
因为还没有注册完成,所以当前 channel
和 NioEventLoop
根本就还没有绑定到一起,所以是没法执行的,这里会进入 if (!registered)
流程:将 handlerContext
封装成一个 PendingHandlerAddedTask
实例,先挂载到 pipeline
的 pendingHandlerCallbackHead
全局变量下,待后续 channel
注册完成后再来处理 handler#handlerAdded()
。还有个问题,如果添加多个 handler
, PendingHandlerAddedTask
有个 next
,可以把它们串成一个链表即可。
这样,我们把 init()
方法的主要逻辑基本都分析完成了,现在我们再回过头看下 initAndRegister
方法中另外一个重要逻辑: config().group().register(channel)
。这里的 config().group()
就是获取的是之前传入的用于处理 server
端线程组: EventLoopGroup bossGroup = new NioEventLoopGroup()
。
NioEventLoopGroup#register()
第一步就是使用 chooser
选取一个其管理的 NioEventLoop
,默认选取策略很简单,就是使用一个递增序列 idx
,然后和数组长度取模即可:
executors[idx.getAndIncrement() & executors.length - 1]
选取好 NioEventLoop
后,调用 NioEventLoop#register(channel)
方法, NioEventLoop#register(channel)
方法又会调用 channle
的 Unsafe
对象的 register
进行处理,并把自己即 NioEventLoop
作为参数传入:
promise.channel().unsafe().register(this, promise);
Channel
创建时同时创建一个 UnSafe
对象,主要用于处理与 java
底层 socket
相关操作。
所以, register()
方法跑了一圈最后还是在 channel
中的 Unsafe#register()
方法中进行处理, NioEventLoopGroup
只是利用 chooser
选取一个 NioEventLoop
作为参数传入到 register()
方法中。
我们来看下 Unsafe#register()
方法做了哪些事情。
1、后面这个 eventLoop
就是将刚才利用 chooser
选取的 NioEventLoop
,通过赋值给 channel
的 eventLoop
字段上,即完成了 channel
和 NioEventLoop
的关联;
AbstractChannel.this.eventLoop = eventLoop;
2、调用 register0()
方法, register0()
方法需要在 NioEventLoop
线程中执行才行,所以这里也使用 if (eventLoop.inEventLoop())
判断下,当前是主线程 main
,所以会进入到 else
逻辑处理中,把执行逻辑封装成任务提交到 NioEventLoop
的任务队列 taskQueue
中:
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
eventLoop.execute()
方法中处理不只是简单将 task
放入到 taskQueue
中,我们来看下其还做了哪些事:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
这个方法主要完成3件事:
addTask(task) register0() taskQueue taskQueue
startThread() NioEventLoop channel startThread() if (!inEventLoop) NioEventLoop inEventLoop false NioEventLoop selector.select() processSelectedKeys() runAllTasks()
wakeup() NioEventLoop NioEventLoop selector.select(timeout) wakeup() NioEventLoop select()
eventLoop.execute()
分析完成后, register0()
方法任务已被添加到 taskQueue
中,然后启动 NioEventLoop
线程开始干活,最后通过 wakeup()
唤醒 NioEventLoop
让其去处理 taskQueue
中的任务,所以,这时我们需要再回头看下 register0()
方法。
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {//这里实际返回false,channelActive()不会在这里触发
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
这个方法主要完成3件事:
doRegister() java api channel selector javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
channel NioEventLoop Selector
SelectionKey=0 channel selector OP_ACCEPT
this attachment this server NioServerSocketChannel selector OP_ACCEPT NioServerSocketChannel accept()
pipeline.invokeHandlerAddedIfNeeded() pipeline handler channel handler PendingHandlerAddedTask pipeline.pendingHandlerCallbackHead channel pendingHandlerCallbackHead handler#handlerAdded() ChannelInitializer#handlerAdded() initChannel() handler ServerBootstrapAcceptor pipeline ChannelInitializer pipeline
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
pipeline.fireChannelRegistered()
:回调 handler#channelRegistered()
方法; pipeline.fireChannelActive() channel if (isActive()) channel isActive() true
总结
分析到这里, doBind()
方法中两个重要方法: initAndRegister()
和 doBind0()
的第一个方法已全部分析完成, initAndRegister()
还是完成了相当多的任务,其核心逻辑总结下:创建 NioServerSocketChannel
,然后进行各种配置初始化,最重要的一步是把 channel
注册到 NioEventLoop
上, NioEventLoop
采用单线程模式轮询事件、处理事件。 handler
回调方法: handlerAdded()
和 channelRegistered()
也会在上面执行过程中被触发调用。
长按识别关注, 持续输出原创