Channel 概念与 java.nio.channel 概念一致, 用以连接IO设备 (socket, 文件等) 的纽带. 例如将网络的读、写, 客户端发起连接, 主动关闭连接, 链路关闭, 获取通信双方的网络地址等.
Channel 的 IO 类型主要有两种: 非阻塞IO (NIO) 以及阻塞IO(OIO).
数据传输类型有两种: 按事件消息传递 (Message) 以及按字节传递 (Byte).
适用方类型也有两种: 服务器(ServerSocket) 以及客户端(Socket). 还有一些根据传输协议而制定的的Channel, 如: UDT、SCTP等.
Netty 按照类型逐层设计相应的类. 最底层的为抽象类 AbstractChannel
, 再以此根据IO类型、数据传输类型、适用方类型实现. 类图可以一目了然, 如下图所示:
/** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} */ void channelRegistered(ChannelHandlerContext ctx) throws Exception;
从注释里面可以看到是在 Channel
绑定到 Eventloop
上面的时候调用的.
不管是 Server 还是 Client, 绑定到 Eventloop
的时候, 最终都是调用 Abstract.initAndRegister()
这个方法上(Server是在 AbstractBootstrap.doBind()
的时候调用的, Client 是在 Bootstrap.doConnect()
的时候调用的).
initAndRegister()
方法定义如下:
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // 把channel绑定到Eventloop对象上面去 ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
继续跟踪下去会定位到 AbstractChannel.AbstractUnsafe.register0()
方法上.
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 做实际的绑定动作。把Channel感兴趣的事件注册到Eventloop.selector上面.具体实现在Abstract.doRegister()方法内 doRegister(); neverRegistered = false; registered = true; // 通过pipeline的传播机制,触发handlerAdded事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); // 通过pipeline的传播机制,触发channelRegistered事件 pipeline.fireChannelRegistered(); // 还没有绑定,所以这里的 isActive() 返回false. if (isActive()) { if (firstRegistration) { // 如果当前链路已经激活,则调用channelActive()方法 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
从上面的代码也可以看出, 在调用完 pipeline.fireChannelRegistered()
之后, 紧接着会调用 isActive()
判断当前链路是否激活, 如果激活了则会调用 pipeline.fireChannelActive()
方法.
这个时候, 对于 Client 和 Server 都还没有激活, 所以, 这个时候不管是 Server 还是 Client 都不会调用 pipeline.fireChanenlActive()
方法.
从启动器的 bind()
接口开始, 往下调用 doBind()
方法:
private ChannelFuture doBind(final SocketAddress localAddress) { // 初始化及注册 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 调用 doBind0 doBind0(regFuture, channel, localAddress, promise); return promise; } else { .... } }
doBind
方法又会调用 doBind0()
方法, 在 doBind0()
方法中会通过 EventLoop
去执行 channel
的 bind()
任务.
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 调用channel.bind接口 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
doBind0()
方法往下会调用到 pipeline.bind(localAddress, promise)
; 方法, 通过 pipeline
的传播机制, 最终会调用到 AbstractChannel.AbstractUnsafe.bind()
方法, 这个方法主要做两件事情:
doBind() pipeline.fireChannelActive()
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { .... // wasActive 在绑定成功前为 false boolean wasActive = isActive(); try { // 调用doBind()调用JDK底层API进行端口绑定 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 完成绑定之后,isActive() 返回true if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { // 触发channelActive事件 pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
也就是说当有新客户端连接的时候, 会变成活动状态.
fireChannelnactive()
方法在两个地方会被调用: Channel.close()
和 Channel.disconnect()
.
在调用前会先确认状态是从 Active
---> Inactive
.
fireChannelUnregistered()
方法是在 Channel
从 Eventloop
中解除注册的时候被调用的. Channel.close()
的时候被触发执行.
handlerAdded()
: 添加到 ChannelPipeline
时调用.
handlerRemoved()
: 从 ChannelPipeline
中移除时调用.
exceptionCaught()
: 处理过程中在 ChannelPipeline
中有错误产生时调用.
处理 I/O 事件或截获 I/O 操作, 并将其转发到 ChannelPipeline
中的下一个处理程序. ChannelHandler
本身不提供许多方法, 但通常必须实现其子类型之一:
ChannelInboundHandler ChannelOutboundHandler
channelRegistered()
: 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用.
channelUnregistered()
: 当 Channel 从他的 EventLoop 注销并且无法处理任何 I/O 时被调用.
channelActive()
: 当 Channel 处于活动状态时被调用.
channelInactive()
: 当 Channel 离开活动状态并且不再连接远程节点时被调用.
channelRead()
: 当从 Channel 读取数据时被调用.
channelReadComplete()
: 当 Channel 上的一个读操作完成时被调用. 当所有可读字节都从 Channel 中读取之后, 将会调用该回调方法.
出站操作和数据将由 ChannelOutboundHandler 处理. 它的方法将被 Channel
ChannelPipeline
以及 ChannelHandlerContext
调用.
ChannelOutboundHandler
的一个强大的功能是可以按需推迟操作或事件, 这使得可以通过一些复杂的方法来处理请求. 例如, 如果到远程节点的写入被暂停, 那么你可以推迟刷新操作并在稍后继续.
connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)
: 当请求将 Channel 连接到远程节点时被调用.
disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
: 当请求将 Channel 从远程节点断开时被调用.
deregister(ChannelHandlerContext ctx, ChannelPromise promise)
: 当请求将 Channel 从它的 EventLoop 注销时被调用.
read(ChannelHandlerContext ctx)
: 当请求从 Channel 读取更多的数据时被调用.
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
: 当请求通过 Channel 将数据写到远程节点时被调用.
flush(ChannelHandlerContext ctx)
: 当请求从 Channel 将入队数据冲刷到远程节点时被调用.
ChannelFuture
表示 Channel
中异步I/O操作的结果, 在 netty 中所有的 I/O 操作都是异步的, I/O 的调用会直接返回, 可以通过 ChannelFuture
来获取 I/O 操作的结果或者状态信息.
当 I/O 操作开始时, 将创建一个新对象. 新的对象是未完成的-它既没有成功, 也没有失败, 也没有被取消, 因为 I/O 操作还没有完成.
如果 I/O 操作已成功完成(失败或取消), 则对象将标记为已完成, 其中包含更具体的信息, 例如故障原因.
请注意, 即使失败和取消属于已完成状态.
ChannelPromise
是 ChannelFuture
的一个子接口, 其定义了一些可写的方法, 如 setSuccess()
和 setFailure()
, 从而使 ChannelFuture
不可变.
当做了一个 I/O 操作并有任何后续任务的时候, 推荐优先使用 addListener(GenericFutureListener)
的方式来获得通知, 而非 await()
addListener(GenericFutureListener)
是非阻塞的. 它会把特定的 ChannelFutureListener
添加到 ChannelFuture
中, 然后 I/O 线程会在 I/O 操作相关的 future 完成的时候通知监听器.
ChannelFutureListener
会利于最佳的性能和资源的利用, 因为它一点阻塞都没有. 而且不会造成死锁.
ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
这两个适配器类分别提供了
ChannelInboundHandler
和 ChannelOutboundHandler
的基本实现, 它们继承了共同的父接口
ChannelHandler
的方法, 扩展了抽象类 ChannelHandlerAdapter
.
ChannelHandlerAdapter
还提供了实用方法 isSharable()
.
如果其对应的实现被标注为 Sharable
, 那么这个方法将返回 true
, 表示它可以被添加到多个 ChannelPipeline
中.
ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
中所提供的方法体调用了其相关联的 ChannelHandlerContext
上的等效方法, 从而将事件转发到了 ChannelPipeline
中的 ChannelHandler
中.
ChannelPipeline
将多个 ChannelHandler
链接在一起来让事件在其中传播处理. 一个 ChannelPipeline
中可能不仅有入站处理器, 还有出站处理器, 入站处理器只会处理入站的事件, 而出站处理器只会处理出站的数据.
每一个新创建的 Channel
都将会分配一个新的 ChannelPipeline
, 不能附加另一个 ChannelPipeline
, 也不能分离当前的.
通过调用 ChannelHandlerContext
实现, 它将被转发给同一个超类型的下一个 ChannelHandler
.
从事件途径 ChannelPilpeline
的角度来看, ChannelPipeline
的头部和尾端取决于该事件是入站的还是出站的.
而 Netty 总是将 ChannelPilpeline
的入站口 (左侧) 作为头部, 将出站口 (右侧) 作为尾端.
当通过调用 ChannelPilpeline.add*()
方法将入站处理器和出站处理器混合添加到 ChannelPilpeline
之后, 每一个 ChannelHandler
从头部到尾端的顺序就是我们添加的顺序.
在 ChannelPilpeline
传播事件时, 它会测试 ChannelPilpeline
中的下一个 ChannelHandler
的类型是否和事件的运动方向相匹配. 如果不匹配, ChannelPilpeline
将跳过该 ChannelHandler
并前进到下一个, 直到它找到和该事件期望的方向相匹配的为止.
这里指修改 ChannelPipeline
中的 ChannelHandler
的编排.
通过调用 ChannelPipeline
上的相关方法, ChannelHandler
可以添加, 删除或者替换其他的 ChannelHandler
, 从而实时地修改 ChannelPipeline
的布局.
addFirst // 将 ChannelHandler 插入第一个位置 addBefore // 在某个 ChannelHandler 之前添加一个 addAfter // 在某个 ChannelHandler 之后添加一个 addLast // 将 ChannelHandler 插入最后一个位置 remove // 移除某个 ChannelHandler replace // 将某个 ChannelHandler 替换成指定 ChannelHandler
ChannelHandlerContext
代表了 ChanelHandler
和 ChannelPipeline
之间的关联, 每当有 ChanelHandler
添加到 ChannelPipeline
中, 都会创建 ChannelHandlerContext
.
ChannelHandlerContext
的主要功能是管理它所关联的 ChannelPipeline
和同一个 ChannelPipeline
中的其他 ChanelHandler
之间的交互.
ChannelHandlerContext
有很多的方法, 其中一些方法也存在于 Channel
和 ChannelPipeline
上, 但是有一点重要的不同.
如果调用 Channel
和 ChannelPipeline
上的这些方法将沿着 ChannelPipeline
进行传播(从头或尾开始).
而调用位于 ChannelHandlerContext
上的相同方法, 则将从当前所关联的 ChannelHandler
开始, 并且只会传播给位于该 ChannelPipeline
中的下一个能够处理该事件的 ChannelHandler
.
这样做可以减少 ChannelHandler
的调用开销.
上图为 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之间的关系.