上篇文章中我们梳理了ChannelPipeline中入站事件的传播,这篇文章中我们看下出站事件的传播,也就是ChannelOutboundHandler接口的实现。
我们对上篇文章中的示例代码进行改造,在ChannelPipeline中加入ChannelOutboundHandler出站实现
public class ServerApp { public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup work = new NioEventLoopGroup(2); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // p.addLast(new LoggingHandler(LogLevel.INFO)); // 向ChannelPipeline中添加自定义channelHandler p.addLast(new OutHandlerA()); p.addLast(new ServerHandlerA()); p.addLast(new ServerHandlerB()); p.addLast(new ServerHandlerC()); p.addLast(new OutHandlerB()); p.addLast(new OutHandlerC()); } }); bootstrap.bind(8050).sync(); } catch (Exception e) { // TODO: handle exception } } } public class OutHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.err.println(this.getClass().getName()+msg); ctx.writeAndFlush((ByteBuf)msg); } } public class OutHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) { System.out.println(this.getClass().getName()+msg); ctx.write((ByteBuf)msg); } } public class OutHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) { System.out.println(this.getClass().getName()+"--"+msg); ctx.write((ByteBuf)msg); } }
然后我们在ServerHandlerA的channelRead方法中执行ctx的write方法,模拟消息出站事件的发生。
public class ServerHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(); byteBuf.writeByte(1); byteBuf.writeByte(2); ctx.channel().write(byteBuf); //ctx.write(byteBuf); } }
上面channelRead方法中write方法的调用有两种方式 ctx.channel().write 与 ctx.write,这两种方式有何区别呢,我们首先看下这两种方式的运行结果
ctx.channel().write
io.netty.example.echo.my.OutHandlerC--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256) io.netty.example.echo.my.OutHandlerB--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256) io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)
ctx.write
io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)
可以看到当调用ctx.channel().write时,消息在管道中传播的顺序是从尾部一直传递到最上层的OutboundHandler;而 ctx.write会从所在的 handler 向前找 OutboundHandler。
那么这两种方式区别是否就如结果所示呢,下面我们就开始对这两种方法的内部实现进行分析
ctx.channel().write与 ctx.write 分别用的是AbstractChannel与AbstractChannelHandlerContext的write方法
AbstractChannel 的 write方法
@Override public ChannelFuture write(Object msg) { return pipeline.write(msg); }
AbstractChannelHandlerContext 的 write方法
@Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
上面代码中AbstractChannel的 wirte方法最终调用的是pipeline的write方法,我们进入pipeline内部查看,可以看到pipeline的write方法默认从尾部AbstractChannelHandlerContext节点开始调用。
@Override public final ChannelFuture write(Object msg) { return tail.write(msg); }
继续向下跟踪最终它们调用的都是AbstractChannelHandlerContext 的 write方法,下面我们看下方法内部的具体实现。
private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) {//检查ChannelPromise是否有效 ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } //寻找上一个AbstractChannelHandlerContext节点 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) {//与当前线程是否一致 if (flush) {//确定是否要把数据冲刷到远程节点 next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { //如果不一致的封装成writeTask任务线程 final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } //把该线程任务交给对应的EventExecutor执行 if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
主要关注下findContextOutbound(),这个方法的作用就是获取当前AbstractChannelHandlerContext节点的上一个节点prev
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev;//获取当前节点的上一个节点 } while (!ctx.outbound);//判断是不是出站节点 return ctx; }
最终通过next.invokeWrite(m, promise)回调方法,调用下一个节点中封装的ChannelOutboundHandler的write方法,从而实现write方法事件的传递
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) {//判断当前ChannelOutboundHandler是否已经被添加到pipeline中(handlerAdded事件触发) invokeWrite0(msg, promise); } else { write(msg, promise); } } private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
到这里整个出站事件的传播流程已经基本清晰了,wirte方法本身就是一个寻找并回调下一个节点中wirte方法的过程。
在上面代码中可以看到这两个方法主要在于是否会在执行write方法后,是否会执行flush方法。
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //是否调用回调方法 //调用write与flush回调方法,最终调用自定义hander的对应实现 invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
这里需要注意的是invokeFlush0()在invokeWrite0后执行,也就是必须等到消息出站事件传递完毕后,才会调用flush把数据冲刷到远程节点。简单理解就是你无论是在OutHandlerA、OutHandlerB还是OutHandlerC中调用writeAndFlush,最后都是要在write事件传递完毕才会flush数据的。
同时我们需要注意到当write与flush事件从OutHandlerA再往上传递时,OutHandlerA的的上一个节点就是Pipeline的头节点HeadContext,我们看下HeadContext的write与flush方法实现;
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) { unsafe.flush(); }
到这里我们可以看出,消息的真正入队与发送最终是通过HeadContext的write与flush方法实现。
通过以上的分析我们可以看到Pipeline出站事件的传播流程,同时我们需要注意ctx.write与ctx.channel().write的区别以及消息的发送最终是通头部节点调用unsafe的write与flush方法实现的,其中如有不足与不正确的地方还望指出与海涵。