本文是永顺大牛写的系列教程 《源码之下无秘密 ── 做最好的 Netty 源码分析教程》 的续写章节。本章主要介绍Netty中用来处理数据流的handler以及底层原理。
永顺前辈已写完的章节有如下:
笔者尝试续写的章节:
本文使用的netty版本为4.1.33.Final
我们先来回忆下在 《Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (一)》 提到过来的handler链式结构:
本文的封面图使用了一张流水线卡通图,正因为这个链式结构非常类似于制造业里的流水线,handler就像是流水里的处理节点,而入站出站数据就如同流水线上被加工的产品。
一个常见的客户端初始化过程是这个样子的:
Bootstrap bootstrap = new Bootstrap(); ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 管道中添加基于换行符分割字符串的解析器 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 管道中添加字符串编码解码器 ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); // 管道中添加服务端处理逻辑 ch.pipeline().addLast(new MyClientEchoHandler()); } }).connect("127.0.0.1", 9898).sync(); future.channel().closeFuture().sync();
在ChannelInitializer的实现方法中,调用ch.pipeline().addLast方法,不断地将handler追加到双向链表中(TailContext之前),从而形成上图所示的双向链表结构。
入站handler都实现了ChannelInboundHandler接口:
public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; void channelReadComplete(ChannelHandlerContext ctx) throws Exception; void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
该接口内定义了常见的入站操作包括channelActive、channelRead、channelInactive等,还支持用户自定义入站操作userEventTriggered。
ChannelInboundHandlerAdapter是ChannelInboundHandler接口的一个默认实现,内部所有方法都是将入站操作往后传递,不作任何业务处理,如channelRead方法:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
ChannelInboundHandlerAdapter并没有包含任何业务逻辑,用户的handler子类可以继承它,然后覆盖并实现其中的部分方法。下文要提到的 SimpleChannelInboundHandler 以及 ByteToMessageDecoder 正是其中两个案例。
出站handler都实现了ChannelOutboundHandler接口,并提供常见的出站操作(bind、connect、close、write、flush等等):
public interface ChannelOutboundHandler extends ChannelHandler { void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void read(ChannelHandlerContext ctx) throws Exception; void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; void flush(ChannelHandlerContext ctx) throws Exception; }
类似地,ChannelOutboundHandlerAdapter是该接口的一个默认实现,内部所有方法都是将出站操作往前传递,不作任何业务处理,如write方法:
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
同样地,子类可以继承ChannelOutboundHandlerAdapter,并覆盖实现其中的任何方法。MessageToByteEncoder和MessageToMessageEncoder这两个编码器是常见的实现子类。
解码器是典型的入站处理器。解码器处理的入站数据结构一般是ByteBuf。
例如ByteToMessageDecoder,可以从ByteBuf这种字节流中读取数据,然后转换为其他形式的消息对象(也可以是ByteBuf)。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { // 仅处理ByteBuf对象 // 新建out列表,用于保存解码得到的对象列表 CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 调用解码实现方法 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); // 调用fireChannelRead传递解码得到的对象列表out fireChannelRead(ctx, out, size); // 回收对象 out.recycle(); } } else { ctx.fireChannelRead(msg); } }
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { // ...省略部分代码... decodeRemovalReentryProtection(ctx, in, out); // ...省略部分代码... } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } } final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; try { decode(ctx, in, out); } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { handlerRemoved(ctx); } } }
可见callDecode内部持续循环消费字节流,然后底层调用了子类实现的抽象方法decode进行解码。其中,常见的实现类有LineBasedFrameDecoder。
LineBasedFrameDecoder实现了根据一个ByteBuf以换行符分割为多个ByteBuf的功能,核心实现如下:
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 解码得到的对象都放out列表中 Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (!discarding) { if (eol >= 0) { // 如果找到换行符 final ByteBuf frame; // 计算当前帧的长度以及分隔符长度 final int length = eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '/r'? 2 : 1; if (length > maxLength) { // 如果该帧长度大于最大长度,则抛异常 buffer.readerIndex(eol + delimLength); fail(ctx, length); return null; } if (stripDelimiter) { // frame去掉分隔符 frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { // frame包含分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else { final int length = buffer.readableBytes(); if (length > maxLength) { // 如果没有换行符,而且该帧长度大于最大长度 // 则标记discarding为true,且丢弃所有可读数据 discardedBytes = length; buffer.readerIndex(buffer.writerIndex()); discarding = true; offset = 0; if (failFast) { fail(ctx, "over " + discardedBytes); } } return null; } } else { if (eol >= 0) { // 如果有换行符,丢弃换行符前的所有可读数据 final int length = discardedBytes + eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '/r'? 2 : 1; buffer.readerIndex(eol + delimLength); discardedBytes = 0; discarding = false; if (!failFast) { fail(ctx, length); } } else { // 如果没有换行符,丢弃所有可读数据 discardedBytes += buffer.readableBytes(); buffer.readerIndex(buffer.writerIndex()); // We skip everything in the buffer, we need to set the offset to 0 again. offset = 0; } return null; } } private int findEndOfLine(final ByteBuf buffer) { int totalLength = buffer.readableBytes(); // 找到换行符/n所在的下标 int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF); if (i >= 0) { offset = 0; // 某些系统以/r/n作为换行符,这里修改下标为/r的下标 if (i > 0 && buffer.getByte(i - 1) == '/r') { i--; } } else { offset = totalLength; } return i; }
上一小节的ByteToMessageDecoder实现了从ByteBuf到消息对象的解码转换,而MessageToMessageDecoder可以实现消息之间的解码转换。核心实现如下:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CodecOutputList out = CodecOutputList.newInstance(); try { // 检查msg是否满足指定的模板类型I if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; try { // 调用decode抽象方法 decode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.getUnsafe(i)); } out.recycle(); } } /** * Decode from one message to an other. This method will be called for each written message that can be handled * by this decoder. * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to * @param msg the message to decode to an other one * @param out the {@link List} to which decoded messages should be added * @throws Exception is thrown if an error occurs */ protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
MessageToMessageDecoder里的业务非常少,核心的解码转换逻辑还需要子类去实现。常用的实现类有StringDecoder。
StringDecoder非常简单,输入模板类型ByteBuf,然后转换为String。核心解码转换方法如下:
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { out.add(msg.toString(charset)); }
编码器是典型的出站处理器。同时,也分为MessageToMessageEncoder和MessageToByteEncoder两种。其功能和实现刚好是解码器的逆过程,所以这里不再详细分析,不然本文就沦为水文一篇了。
此外,类似地,StringEncoder也是StringDecoder的逆过程,实现也非常简单此处不作赘言。
用户自定义handler的时候最常用到的父类是SimpleChannelInboundHandler。相比ChannelInboundHandlerAdapter,它为用户做了消息对象的数据类型强制转换,方便数据处理,并且确保消息对象被释放掉。核心实现如下:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { // 类型强制转换 @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
用户需要实现channelRead0方法,自定义业务逻辑。
最后以一张类图温习本文: