本文是永顺大牛写的系列教程 《源码之下无秘密 ── 做最好的 Netty 源码分析教程》 的续写章节。本章主要介绍Netty中用来处理数据流的handler以及底层原理。
我们先来回忆下在 《Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (一)》 提到过来的handler链式结构:
Bootstrap bootstrap = new Bootstrap(); ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 管道中添加基于换行符分割字符串的解析器 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 管道中添加字符串编码解码器 ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); // 管道中添加服务端处理逻辑 ch.pipeline().addLast(new MyClientEchoHandler()); } }).connect("", 9898).sync(); future.channel().closeFuture().sync();
public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; void channelReadComplete(ChannelHandlerContext ctx) throws Exception; void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
ChannelInboundHandlerAdapter并没有包含任何业务逻辑,用户的handler子类可以继承它,然后覆盖并实现其中的部分方法。下文要提到的 SimpleChannelInboundHandler 以及 ByteToMessageDecoder 正是其中两个案例。
public interface ChannelOutboundHandler extends ChannelHandler { void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void read(ChannelHandlerContext ctx) throws Exception; void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; void flush(ChannelHandlerContext ctx) throws Exception; }
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { // 仅处理ByteBuf对象 // 新建out列表,用于保存解码得到的对象列表 CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 调用解码实现方法 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); // 调用fireChannelRead传递解码得到的对象列表out fireChannelRead(ctx, out, size); // 回收对象 out.recycle(); } } else { ctx.fireChannelRead(msg); } }
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { // ...省略部分代码... decodeRemovalReentryProtection(ctx, in, out); // ...省略部分代码... } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } } final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; try { decode(ctx, in, out); } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { handlerRemoved(ctx); } } }
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 解码得到的对象都放out列表中 Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (!discarding) { if (eol >= 0) { // 如果找到换行符 final ByteBuf frame; // 计算当前帧的长度以及分隔符长度 final int length = eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '/r'? 2 : 1; if (length > maxLength) { // 如果该帧长度大于最大长度,则抛异常 buffer.readerIndex(eol + delimLength); fail(ctx, length); return null; } if (stripDelimiter) { // frame去掉分隔符 frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { // frame包含分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else { final int length = buffer.readableBytes(); if (length > maxLength) { // 如果没有换行符,而且该帧长度大于最大长度 // 则标记discarding为true,且丢弃所有可读数据 discardedBytes = length; buffer.readerIndex(buffer.writerIndex()); discarding = true; offset = 0; if (failFast) { fail(ctx, "over " + discardedBytes); } } return null; } } else { if (eol >= 0) { // 如果有换行符,丢弃换行符前的所有可读数据 final int length = discardedBytes + eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '/r'? 2 : 1; buffer.readerIndex(eol + delimLength); discardedBytes = 0; discarding = false; if (!failFast) { fail(ctx, length); } } else { // 如果没有换行符,丢弃所有可读数据 discardedBytes += buffer.readableBytes(); buffer.readerIndex(buffer.writerIndex()); // We skip everything in the buffer, we need to set the offset to 0 again. offset = 0; } return null; } } private int findEndOfLine(final ByteBuf buffer) { int totalLength = buffer.readableBytes(); // 找到换行符/n所在的下标 int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF); if (i >= 0) { offset = 0; // 某些系统以/r/n作为换行符,这里修改下标为/r的下标 if (i > 0 && buffer.getByte(i - 1) == '/r') { i--; } } else { offset = totalLength; } return i; }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CodecOutputList out = CodecOutputList.newInstance(); try { // 检查msg是否满足指定的模板类型I if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; try { // 调用decode抽象方法 decode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.getUnsafe(i)); } out.recycle(); } } /** * Decode from one message to an other. This method will be called for each written message that can be handled * by this decoder. * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to * @param msg the message to decode to an other one * @param out the {@link List} to which decoded messages should be added * @throws Exception is thrown if an error occurs */ protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { out.add(msg.toString(charset)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { // 类型强制转换 @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;