在上一篇中,提到了 Channel、ChannelPipeline、ChannelHandler 以及ChannelHandlerContext 之间的关系 ,现在看看ChannelHandler和ChannelHandlerContext。
ChannelHandler的类图如下:
ChannelHandler的类继承图如下,ChannelHandler有两个子接口,分别是ChannelOutboundHandler和ChannelInboundHandler,以及一个子抽象类ChannelHandlerAdapter。
ChannelInboundHandler是处理入站数据以及处理各种状态变化对应的事件。
ChannelOutboundHandler是处理出站数据并且允许拦截所有的操作。
ChannelHandlerAdapter,适配器提供了大量默认的ChannelHandler实现,其旨在简化应用程序处理逻辑的开发过程。
在上一篇文章中,可以看到ChannelPipeline添加ChannelHandler的时候,会把ChannelHandler包装成HandlerContext,这个HandlerContext类实际上是ChannelHandlerContext,ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互
之前已经了解过ChannelPipeline的addLast,如果有消息进来,那要怎么处理呢,我们先看看简单的例子。
服务器对两个入站数据,两个出站数据进行处理,其中FirstHandler和ThirdHandler是入站,SecondHandler和FourthHandler是出站。数据处理都是简单的在后面加个字符串
EchoClient代码可以用之前的小例子,主要看服务器的代码。
FirstHandler:
@ChannelHandler.Sharable public class FirstHandler extends ChannelInboundHandlerAdapter { /** * 服务端接收客户端信息的时候调用 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; ByteBufUtil.writeUtf8(byteBuf,"a"); System.out.println("FirstHandler:" + byteBuf.toString(CharsetUtil.UTF_8)); ctx.fireChannelRead(msg); } /** * 服务端处理客户端最后一条消息后调用 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)// flush数据 .addListener(ChannelFutureListener.CLOSE);// 关闭Channel } /** * 服务端处理消息过程中,对异常的处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印异常 cause.printStackTrace(); // 关闭Channel ctx.close(); } }
SecondHandler
@ChannelHandler.Sharable public class SecondHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; ByteBufUtil.writeUtf8(byteBuf,"b"); System.out.println("SecondHandler:" + byteBuf.toString(CharsetUtil.UTF_8)); super.write(ctx, msg, promise); } }
ThirdHandler
@ChannelHandler.Sharable public class ThirdHandler extends ChannelInboundHandlerAdapter { /** * 服务端接收客户端信息的时候调用 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; ByteBufUtil.writeUtf8(byteBuf,"c"); System.out.println("ThirdHandler:" + byteBuf.toString(CharsetUtil.UTF_8)); ctx.write(byteBuf); } /** * 服务端处理客户端最后一条消息后调用 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)// flush数据 .addListener(ChannelFutureListener.CLOSE);// 关闭Channel } /** * 服务端处理消息过程中,对异常的处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印异常 cause.printStackTrace(); // 关闭Channel ctx.close(); } }
FourthHandler
@ChannelHandler.Sharable public class FourthHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("FourthHandler:" + byteBuf.toString(CharsetUtil.UTF_8)); super.write(ctx, msg, promise); } }
EchoServer
public class EchoServer { public static void main(String[] args) throws InterruptedException { final FirstHandler firstHandler = new FirstHandler(); final SecondHandler secondHandler = new SecondHandler(); final ThirdHandler thirdHandler = new ThirdHandler(); final FourthHandler fourthHandler = new FourthHandler(); // 创建NioEventLoopGroup类型的EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { // 创建ServerBootstrap ServerBootstrap sbs = new ServerBootstrap(); sbs.group(group) // 设置Channel为NIO的服务端Channel .channel(NioServerSocketChannel.class) // 绑定本地端口 .localAddress(new InetSocketAddress(Const.PORT)) // 新连接被接受时,会创建一个Channel // 再把把echoServerHandler加入到这个Channel的ChannelPipeline中 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 把echoServerHandler加入到ChannelPipeline中 socketChannel.pipeline().addLast(firstHandler); socketChannel.pipeline().addLast(secondHandler); socketChannel.pipeline().addLast(thirdHandler); socketChannel.pipeline().addLast(fourthHandler); System.out.println(1); } }); // 异步绑定服务器,阻塞到服务器绑定完成 ChannelFuture sync = sbs.bind().sync(); // 获取channel的closeFuture,阻塞到关闭 sync.channel().closeFuture().sync(); } finally { // 优雅的关掉group并释放所有的资源 group.shutdownGracefully().sync(); } } }
当执行socketChannel.pipeline().addLast后,整个数据结构如下:
运行结果如下:
从到FirstHandler再到ThirdHandler,最后是SecondHandler,可以看出FourthHandler并没有被执行到。
当检测有数据时,会调用AbstractNioByteChannel的read方法,这个方法里会调用pipeline的fireChannelRead(byteBuf)方法,可以看出,从head开始:
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
head调用invokeChannelRead方法。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
head调用channelRead方法,在channelRead方法里,调用fireChannelRead方法,也就是调用下一个ChannelInboundHandler的channelRead,这个ChannelInboundHandler就是firstHandler。findContextInbound方法是用来查找下一个ChannelInboundHandler的。invokeChannelRead方法,详见下面,只是传递的不在是head,而是下一个ChannelInboundHandler。一次是firstHandler、thirdHandler,并调用他们的channelRead方法,如果没有调用fireChannelRead,是不会往下调用ChannelInboundHandler的。
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
当ThirdHandler调用ctx.write时,会调用AbstractChannelHandlerContext的write方法,这里需要注意的是,会调用findContextOutbound方法,与findContextInbound不同的是,他会往上查找ChannelOutboundHandler,所以会直接到secondHandler,执行完write方法后,会往上到head,因此不会到fourthHandler的write方法。
如果想从tail开始往上遍历所有的ChannelOutboundHandler,那可以这样调用
ctx.channel().write(byteBuf);
或
ctx.pipeline().write(byteBuf);
他会调用DefaultChannelPipeline的write方法,从tail开始调用。
public final ChannelFuture write(Object msg) { return tail.write(msg); }