在Netty底层读入的是ByteBuf二进制数据,在实际开发中,需要将它解码为Java的POJO对象,处理完业务后,需要将Java的POJO对象编码为ByteBuf二进制数据放进通道中进行传输。这里就需要使用到Netty的解码器和编码器。
Netty中的解码器都直接或间接地实现了入站处理适配器,所以在使用时,直接继承解码器就行,而不需要再去实现处理适配器。
在继承解码器的时候需要重写decode方法(在父类中是个抽象方法),在decode方法里实现具体的解码过程。Netty中常用的三种解码器。
ByteToMessageDecoder
的,它在内部将传入的ByteBuf换成了自己装饰的ReplayingDecoderBuffer(这个缓冲区可以实现在真正读取数据之前,先检查一下长度是否合格,如果合格再进行数据的读取,否则将抛出ReplayError给ReplayingDecoder(它在收到error后会保留当前数据)),可以免去读取时对长度的检查。这个类还有一个重要的属性state,它表示解码器在解码过程中的当前阶段(因为底层通信协议是分包传输的,为了数据的完整性,需要分阶段解码)。 一般来说,在Netty中进行字符串的传输,采用简单的 Header-Content 内容传输协议:
下面来看一下示例吧(在代码中详细分析)。
先来看看ByteToMessageDecoder解码器,Log类见 分析堆栈信息封装一个SLF4J的静态类
public class StringDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // 可读字节小于4,说明消息长度还没满,直接返回 if (in.readableBytes() < 4){ return; } // 设置回滚点 in.markReaderIndex(); // 读取前四个字节(消息头)存储的消息长度,同时会使readerIndex向前移动4个指针 int len = in.readInt(); Log.info("内容长度: [{}]", len); // 如果可读字节数小于消息长度,说明消息还不完整。 if (in.readableBytes() < len){ // 重置读指针,并返回 in.resetReaderIndex(); return; } byte[] inBytes = new byte[len]; // 将ByteBuf中的数据读到字节数组中 in.readBytes(inBytes, 0, len); // 将读出的字节数组编码成字符串加到结果列表中,向后传输 out.add(new String(inBytes, StandardCharsets.UTF_8)); } } 复制代码
一个简单的业务处理器,将解码的字符串打印出来。
public class StringProcessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String s = (String)msg; Log.info("打印字符串:" + s); } } 复制代码
使用EmbeddedChannel来测试处理器。
public class TestDecoder { public static void main(String[] args){ try{ // 初始化一个通道初始化类 ChannelInitializer channelInitializer = new ChannelInitializer<EmbeddedChannel>(){ @Override protected void initChannel(EmbeddedChannel ch) { ch.pipeline() // 增加一个字符串解码器,将ByteBuf解码为字符串 .addLast(new StringDecoder()) // 一个简单的业务处理器,将刚刚解码的字符串输出 .addLast(new StringProcessHandler()); } }; // 初始化一个嵌入式通道 EmbeddedChannel channel = new EmbeddedChannel(channelInitializer); for (int i = 1; i <= 3; ++i){ // 分配直接内存 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); String s = i + "次发送:Offer来了"; // 获取字符串的字节数组 byte[] bytes = s.getBytes(StandardCharsets.UTF_8); // 先写消息内容的长度 buf.writeInt(bytes.length); // 再写消息内容 buf.writeBytes(bytes); // 将数据写入到通道中 channel.writeInbound(buf); } Thread.sleep(Integer.MAX_VALUE); }catch (InterruptedException e){ e.printStackTrace(); } } } 复制代码
运行结果:
12:31:11.428 [main] INFO decode.StringDecoder - 内容长度: [24] 12:31:11.430 [main] INFO decode.StringProcessHandler - 打印字符串:1次发送:Offer来了 12:31:11.433 [main] INFO decode.StringDecoder - 内容长度: [24] 12:31:11.433 [main] INFO decode.StringProcessHandler - 打印字符串:2次发送:Offer来了 12:31:11.434 [main] INFO decode.StringDecoder - 内容长度: [24] 12:31:11.434 [main] INFO decode.StringProcessHandler - 打印字符串:3次发送:Offer来了 复制代码
再来看一下ReplayingDecoder实现同样的解码操作,会发现代码会更加简洁(但它一般只用于解析逻辑较为简单的场景。因为它在读取数据进行解析时,会先检查长度是否合格,如果不合格将会抛出一个错误,然后结束这次解析,再重新开始,这就导致了可能会反复多次解析一个数据)。
public class StringReplayDecoder extends ReplayingDecoder<StringReplayDecoder.Status> { // 一个枚举类,里面枚举了下面要进行处理的阶段 enum Status{ // 第一部分 内容长度 PARES_1, // 第二部分 消息内容 PARES_2 } private int len; private byte[] bytes; /** * 初始化父类的state属性,表示初始阶段 */ public StringReplayDecoder(){ super(Status.PARES_1); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 通过state()取出当前阶段 switch (state()){ // 第一阶段:读取内容的长度,并用长度来初始化一个字节数组 case PARES_1: // 读取内容长度(装饰的ReplayingDecoderBuffer会在内部检查长度是否合格,而不需要用户来检查) len = in.readInt(); bytes = new byte[len]; Log.info("内容长度:[{}]", len); // 设置下一个阶段,也就是第二阶段 // 同时还有一个作用就是设置读断点指针为当前的读取位置(相当于前面的mark标记) // 当后面的长度不够时,会回到这个位置重新读(相当于前面的重置指针) checkpoint(Status.PARES_2); break; // 第二阶段:读取消息内容,并解码为字符串添加到结果列表中 case PARES_2: // 利用第一阶段取得的长度来读取字节数组 in.readBytes(bytes, 0, len); // 解码为字符串,加到结果列表中,向后传播 out.add(new String(bytes, StandardCharsets.UTF_8)); // 设置下一个阶段,也就是第一阶段(重新从第一阶段开始处理) // 同时也是读断点指针,和前面的一样 checkpoint(Status.PARES_1); break; default: break; } } } 复制代码
运行结果是一模一样的,这里就不贴了。
那么 ByteToMessageDecoder
和 ReplayingDecoder
这两个处理器能不能在通道间共享呢?答案是不能的。
ReplayingDecoder内部是有状态的(state属性就是表示状态),就不用说了。而在ByteToMessageDecoder内部也是有状态的,它有一个二进制字节的累加器,它用来保存没有解析完的二进制内容。所以它两都不能在通道间共享,只能每次都重新创建一个新的实例。
Netty提供了一些可以直接使用的解码器,不用自己去实现解码逻辑。下面看几个常用的解码器。
解码器 | 描述 |
---|---|
DelimiterBasedFrameDecoder | 如果数据包中以自定义的分隔符作为分隔,Netty会自动使用分隔符分割数据包 |
LengthFieldBasedFrameDecoder | 一种灵活的基于长度的解码器。解码器会根据数据包中的长度字段取出数据内容,丢弃不需要的数据 |
StringDecoder | 将ByteBuf二进制数据解码成字符串并向后传播 |
下面主要看看 LengthFieldBasedFrameDecoder
这个解码器,基于Header-Content协议的内容传输,一般都使用这个解码器来解码。
先来看一下这个解码器最常用的构造函数。
/** * @param maxFrameLength * 发送的数据包的最大长度,如果超出此长度,将会被丢弃。 * @param lengthFieldOffset * 长度字段的偏移量,指的是长度字段位于整个数据包内部字节数组中的下标值。 * @param lengthFieldLength * 长度字段所占的字节。一般使用int作为长度字段的类型,那么该值就是4。 * @param lengthAdjustment * 消息内容长度的矫正值。在传输协议比较复杂的时候(比如包含了长度字段、协议版本号等),在解码时就需要对长度进行矫正。 * 公式:内容字段偏移量 - 长度字段偏移量 - 长度字段字节数。 * 其实也就是在长度字段和在内容字段之间的字段所占的字节数。 * @param initialBytesToStrip * 丢弃的起始字节数。其实这个就是内容字段的偏移量。 */ public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { // 这后面还有第六个参数failFast,在这个构造函数中设置为true了。 // 这个参数设置为true的含义是:一旦帧(数据包)的长度将要超过最大长度,不过帧是否完整,都将立即抛出TooLongFrameException this(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment,initialBytesToStrip, true); } 复制代码
下面用这个解码器来重新写上面的测试案例,并在协议头加入了协议版本号。
public class TestDecoder { private static final int VERSION = 100; // 版本号 public static void main(String[] args){ try{ // 构造LengthFieldBasedFrameDecoder // 这里设置最大字节数为1024 // 长度字段的偏移量为0,因为写的时候将长度字段第一个写入的,所以第一个字段就是长度字段 // 长度字段所在字节数为4,因为类型是int // 长度的矫正值为2。因为在长度字段和内容字段之间夹了一个协议版本号,写入类型为char,占2个字节。 // 丢弃的起始字节数。长度字段字节数 + 协议版本号字节数 = 内容字段的偏移量 = 6,这些都是要丢弃的。 final LengthFieldBasedFrameDecoder spilter = new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6); ChannelInitializer channelInitializer = new ChannelInitializer<EmbeddedChannel>(){ @Override protected void initChannel(EmbeddedChannel ch) { ch.pipeline() // 将刚刚初始化的解码器放进通道流水线中,由此可知,该解码器是可以通道间共享的 .addLast(spilter) // 这个解码器不是刚刚自己写的,而是Netty内置的io.netty.handler.codec.string.StringDecoder .addLast(new StringDecoder()) .addLast(new StringProcessHandler()); } }; EmbeddedChannel channel = new EmbeddedChannel(channelInitializer); for (int i = 1; i <= 3; ++i){ ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); String s = i + "次发送:Offer来了" ; byte[] bytes = s.getBytes(StandardCharsets.UTF_8); // 先写消息内容的长度 buf.writeInt(bytes.length); // 再写版本号 buf.writeChar(VERSION); // 最后写消息内容 buf.writeBytes(bytes); channel.writeInbound(buf); } Thread.sleep(Integer.MAX_VALUE); }catch (InterruptedException e){ e.printStackTrace(); } } } 复制代码
运行结果与上面的也是一样的。
编码器与解码器是非常类似的,就是名字不同,内部实现的逻辑不同,使用方法是差不多的。
原理就没啥好说的了,直接来个实例吧。
在说解码器的时候提到了MessageToMessageDecoder,但没有在案例中使用,在这里用的 MessageToMessageEncoder
与 MessageToMessageDecoder
用法是类似的。
StringToInteger
将字符串编码为整型。
public class StringToInteger extends MessageToMessageEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) { // 先把字符串转为字符数组 char[] array = msg.toCharArray(); Log.info("原数据:[{}]", msg); // 遍历字符数组,取出表示的是数字的字符,编码为整型 for (char c : array){ // 48是0的ASCII码,57是9的ASCII码 if (c >= 48 && c <= 57){ Log.info("整型数据:[{}]", c); out.add(new Integer(c)); } } } } 复制代码
将整型编码为ByteBuf二进制数据,这里需要指定传入的数据类型,这里是整型,所以指定 MessageToByteEncoder
的泛型为Integer。
public class IntegerEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { // 将整型数据输入写入ByteBuf中 out.writeInt(msg); } } 复制代码
测试案例,还是使用EmbeddedChannel通道来进行出站测试。
public class TestEncoder { public static void main(String[] args){ ChannelInitializer channelInitializer = new ChannelInitializer() { @Override protected void initChannel(Channel ch) { ch.pipeline() // 出站处理器是从后往前处理,所以这里先转整型,再转ByteBuf .addLast(new IntegerEncoder()) .addLast(new StringToInteger()); } }; EmbeddedChannel channel = new EmbeddedChannel(channelInitializer); // 向通道中写入一些数据 for (int i = 1;i <= 3; ++i){ // 经过出站编码处理,除整型外的字符将被丢弃 channel.write("I am" + i); } channel.flush(); // 获取出站数据 ByteBuf buf = channel.readOutbound(); while(null != buf){ // 将出站数据按整型读出 Log.info("出站消息: [{}]", buf.readInt()); buf = channel.readOutbound(); } try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } } 复制代码
运行结果,只有数字出站(输出的是数字字符对应的ASCII码),字符被丢弃。
14:49:57.047 [main] INFO encode.StringToInteger - 原数据:[I am 1] 14:49:57.048 [main] INFO encode.StringToInteger - 整型数据:[1] 14:49:57.111 [main] INFO encode.StringToInteger - 原数据:[I am 2] 14:49:57.111 [main] INFO encode.StringToInteger - 整型数据:[2] 14:49:57.111 [main] INFO encode.StringToInteger - 原数据:[I am 3] 14:49:57.111 [main] INFO encode.StringToInteger - 整型数据:[3] 14:49:57.112 [main] INFO encode.TestEncoder - 出站消息: [49] 14:49:57.113 [main] INFO encode.TestEncoder - 出站消息: [50] 14:49:57.113 [main] INFO encode.TestEncoder - 出站消息: [51] 复制代码