前面我们分析了ByteToMessageDecoder解码器,对应于解码需要有相关的编码器,即从Java对象转换成二进制数据的组件,这就是MessageToByteEncoder。
下面的StringToByteEncoder中,会将String对象转换成ByteBuf,传递给后面的ChannelOutboundHandler。使用MessageToByteEncoder,只需要实现encode这个方法即可,即通过ChannleHandlerContext和接收到的对应类型的对象,写入到一个ByteBuf中
class StringToByteEncoderextends MessageToByteEncoder<String>{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(s.getBytes()); } }
,因为它的接收和输出都是ByteBuf类型。
public class LengthFieldPrependerextends MessageToMessageEncoder<ByteBuf>{ private final ByteOrder byteOrder; private final int lengthFieldLength; private final boolean lengthIncludesLengthFieldLength; private final int lengthAdjustment; public LengthFieldPrepender( ByteOrder byteOrder,int lengthFieldLength, int lengthAdjustment, boolean lengthIncludesLengthFieldLength){ // 只支持1,2,3,4,8字节长的length字段编码 if (lengthFieldLength != 1 && lengthFieldLength != 2 && lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 8) { throw new IllegalArgumentException( "lengthFieldLength must be either 1, 2, 3, 4, or 8: " + lengthFieldLength); } ObjectUtil.checkNotNull(byteOrder, "byteOrder"); this.byteOrder = byteOrder; this.lengthFieldLength = lengthFieldLength; this.lengthIncludesLengthFieldLength = lengthIncludesLengthFieldLength; this.lengthAdjustment = lengthAdjustment; } @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)throws Exception { int length = msg.readableBytes() + lengthAdjustment; if (lengthIncludesLengthFieldLength) { length += lengthFieldLength; } if (length < 0) { throw new IllegalArgumentException( "Adjusted frame length (" + length + ") is less than zero"); } switch (lengthFieldLength) { case 1: if (length >= 256) { throw new IllegalArgumentException( "length does not fit into a byte: " + length); } out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length)); break; ... 中间省略 case 8: out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length)); break; default: throw new Error("should not reach here"); } out.add(msg.retain()); } }
MessageToByteEncoder同样是过滤出某一种类型的对象,所以也使用了TypeParameterMatcher,其中还有一个参数可以控制是否优先使用DirectMemory堆外直接内存,默认是true使用,如果当前classpath中有 sun.misc.Unsafe
类则使用其获取DirectMemory否则降级为heapMemory。
public abstract class MessageToByteEncoder<I>extends ChannelOutboundHandlerAdapter{ private final TypeParameterMatcher matcher; private final boolean preferDirect; // 创建一个满足当前参数类型 I的Encoder,preferDirect表示是否优先使用DirectMemory protected MessageToByteEncoder(boolean preferDirect) { matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I"); this.preferDirect = preferDirect; }
作为Encoder,需要处理的是write事件。
write逻辑为
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception { ByteBuf buf = null; try { // 判断对象类型 if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; // 申请ByteBuf buf = allocateBuffer(ctx, cast, preferDirect); try { // 编码写入ByteBuf中 encode(ctx, cast, buf); } finally { // release msg对象 ReferenceCountUtil.release(cast); } // 如果ByteBuf不空则ctx write if (buf.isReadable()) { ctx.write(buf, promise); } else { // 否则release byteBuf然后写入一个空BUFFER buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } // 帮助GC回收 buf = null; } else { // 其他类型的对象不处理交给后面的处理 ctx.write(msg, promise); } } catch (EncoderException e) { // encode方法的异常抛给上层处理 throw e; } catch (Throwable e) { // 其他包装成EncoderException的异常抛给上层处理 throw new EncoderException(e); } finally { // 防止encode方法异常导致申请的ByteBuf没有释放 if (buf != null) { buf.release(); } } } protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { // 这里会判断是否有Unsafe类来决定使用DirectMemory还是HeapMemory return ctx.alloc().ioBuffer(); } else { return ctx.alloc().heapBuffer(); } } // 留给子类去实现的具体编码方法 protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)throws Exception;