转载

初识Netty原理(三)——解码和编码

在Netty底层读入的是ByteBuf二进制数据,在实际开发中,需要将它解码为Java的POJO对象,处理完业务后,需要将Java的POJO对象编码为ByteBuf二进制数据放进通道中进行传输。这里就需要使用到Netty的解码器和编码器。

Decoder原理和使用

原理

Netty中的解码器都直接或间接地实现了入站处理适配器,所以在使用时,直接继承解码器就行,而不需要再去实现处理适配器。

在继承解码器的时候需要重写decode方法(在父类中是个抽象方法),在decode方法里实现具体的解码过程。Netty中常用的三种解码器。

  • ByteToMessageDecoder :继承这个类,目的就是将ByteBuf二进制数据解码为Java POJO对象,decode的传入数据类型就是ByteBuf。
  • ReplayingDecoder :这个类是继承 ByteToMessageDecoder 的,它在内部将传入的ByteBuf换成了自己装饰的ReplayingDecoderBuffer(这个缓冲区可以实现在真正读取数据之前,先检查一下长度是否合格,如果合格再进行数据的读取,否则将抛出ReplayError给ReplayingDecoder(它在收到error后会保留当前数据)),可以免去读取时对长度的检查。这个类还有一个重要的属性state,它表示解码器在解码过程中的当前阶段(因为底层通信协议是分包传输的,为了数据的完整性,需要分阶段解码)。
  • MessageToMessageDecoder :继承这个类(继承时,需要指定入站的数据类型),是用于Java POJO对象之间的解码,比如将Integer解码为String等(这里的入站数据类型就是Integer)。

一般来说,在Netty中进行字符串的传输,采用简单的 Header-Content 内容传输协议:

  • 在协议的Head部分放置字符串的字节长度(还可以放置其它字段,如版本号、魔数等)。
  • 在协议的Content部分放置字符串的字节数组。

使用

下面来看一下示例吧(在代码中详细分析)。

先来看看ByteToMessageDecoder解码器,Log类见 分析堆栈信息封装一个SLF4J的静态类

public class StringDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 可读字节小于4,说明消息长度还没满,直接返回
        if (in.readableBytes() < 4){
            return;
        }
        // 设置回滚点
        in.markReaderIndex();
        // 读取前四个字节(消息头)存储的消息长度,同时会使readerIndex向前移动4个指针
        int len = in.readInt();
        Log.info("内容长度: [{}]", len);
        // 如果可读字节数小于消息长度,说明消息还不完整。
        if (in.readableBytes() < len){
            // 重置读指针,并返回
            in.resetReaderIndex();
            return;
        }
        byte[] inBytes = new byte[len];
        // 将ByteBuf中的数据读到字节数组中
        in.readBytes(inBytes, 0, len);
        // 将读出的字节数组编码成字符串加到结果列表中,向后传输
        out.add(new String(inBytes, StandardCharsets.UTF_8));
    }
}
复制代码

一个简单的业务处理器,将解码的字符串打印出来。

public class StringProcessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String s = (String)msg;
        Log.info("打印字符串:" + s);
    }
}
复制代码

使用EmbeddedChannel来测试处理器。

public class TestDecoder {

    public static void main(String[] args){
        try{
            // 初始化一个通道初始化类
            ChannelInitializer channelInitializer = new ChannelInitializer<EmbeddedChannel>(){
                @Override
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline()
                            // 增加一个字符串解码器,将ByteBuf解码为字符串
                            .addLast(new StringDecoder())
                            // 一个简单的业务处理器,将刚刚解码的字符串输出
                            .addLast(new StringProcessHandler());
                }
            };
            // 初始化一个嵌入式通道
            EmbeddedChannel channel = new EmbeddedChannel(channelInitializer);
            for (int i = 1; i <= 3; ++i){
                // 分配直接内存
                ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
                String s = i + "次发送:Offer来了";
                // 获取字符串的字节数组
                byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
                // 先写消息内容的长度
                buf.writeInt(bytes.length);
                // 再写消息内容
                buf.writeBytes(bytes);
                // 将数据写入到通道中
                channel.writeInbound(buf);
            }
            Thread.sleep(Integer.MAX_VALUE);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
复制代码

运行结果:

12:31:11.428 [main] INFO decode.StringDecoder - 内容长度: [24]
12:31:11.430 [main] INFO decode.StringProcessHandler - 打印字符串:1次发送:Offer来了
12:31:11.433 [main] INFO decode.StringDecoder - 内容长度: [24]
12:31:11.433 [main] INFO decode.StringProcessHandler - 打印字符串:2次发送:Offer来了
12:31:11.434 [main] INFO decode.StringDecoder - 内容长度: [24]
12:31:11.434 [main] INFO decode.StringProcessHandler - 打印字符串:3次发送:Offer来了
复制代码

再来看一下ReplayingDecoder实现同样的解码操作,会发现代码会更加简洁(但它一般只用于解析逻辑较为简单的场景。因为它在读取数据进行解析时,会先检查长度是否合格,如果不合格将会抛出一个错误,然后结束这次解析,再重新开始,这就导致了可能会反复多次解析一个数据)。

public class StringReplayDecoder extends ReplayingDecoder<StringReplayDecoder.Status> {
    // 一个枚举类,里面枚举了下面要进行处理的阶段
    enum Status{
        // 第一部分 内容长度
        PARES_1,
        // 第二部分 消息内容
        PARES_2
    }
    private int len;
    private byte[] bytes;
    /**
     * 初始化父类的state属性,表示初始阶段
     */
    public StringReplayDecoder(){
        super(Status.PARES_1);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 通过state()取出当前阶段
        switch (state()){
            // 第一阶段:读取内容的长度,并用长度来初始化一个字节数组
            case PARES_1:
                // 读取内容长度(装饰的ReplayingDecoderBuffer会在内部检查长度是否合格,而不需要用户来检查)
                len = in.readInt();
                bytes = new byte[len];
                Log.info("内容长度:[{}]", len);
                // 设置下一个阶段,也就是第二阶段
                // 同时还有一个作用就是设置读断点指针为当前的读取位置(相当于前面的mark标记)
                // 当后面的长度不够时,会回到这个位置重新读(相当于前面的重置指针)
                checkpoint(Status.PARES_2);
                break;
            // 第二阶段:读取消息内容,并解码为字符串添加到结果列表中
            case PARES_2:
                // 利用第一阶段取得的长度来读取字节数组
                in.readBytes(bytes, 0, len);
                // 解码为字符串,加到结果列表中,向后传播
                out.add(new String(bytes, StandardCharsets.UTF_8));
                // 设置下一个阶段,也就是第一阶段(重新从第一阶段开始处理)
                // 同时也是读断点指针,和前面的一样
                checkpoint(Status.PARES_1);
                break;
            default:
                break;
        }
    }
}
复制代码

运行结果是一模一样的,这里就不贴了。

那么 ByteToMessageDecoderReplayingDecoder 这两个处理器能不能在通道间共享呢?答案是不能的。

ReplayingDecoder内部是有状态的(state属性就是表示状态),就不用说了。而在ByteToMessageDecoder内部也是有状态的,它有一个二进制字节的累加器,它用来保存没有解析完的二进制内容。所以它两都不能在通道间共享,只能每次都重新创建一个新的实例。

Netty内置的Decoder

Netty提供了一些可以直接使用的解码器,不用自己去实现解码逻辑。下面看几个常用的解码器。

解码器 描述
DelimiterBasedFrameDecoder 如果数据包中以自定义的分隔符作为分隔,Netty会自动使用分隔符分割数据包
LengthFieldBasedFrameDecoder 一种灵活的基于长度的解码器。解码器会根据数据包中的长度字段取出数据内容,丢弃不需要的数据
StringDecoder 将ByteBuf二进制数据解码成字符串并向后传播

下面主要看看 LengthFieldBasedFrameDecoder 这个解码器,基于Header-Content协议的内容传输,一般都使用这个解码器来解码。

先来看一下这个解码器最常用的构造函数。

/**
     * @param maxFrameLength 
     *        发送的数据包的最大长度,如果超出此长度,将会被丢弃。
     * @param lengthFieldOffset
     *        长度字段的偏移量,指的是长度字段位于整个数据包内部字节数组中的下标值。
     * @param lengthFieldLength
     *        长度字段所占的字节。一般使用int作为长度字段的类型,那么该值就是4。
     * @param lengthAdjustment
     *        消息内容长度的矫正值。在传输协议比较复杂的时候(比如包含了长度字段、协议版本号等),在解码时就需要对长度进行矫正。
     *        公式:内容字段偏移量 - 长度字段偏移量 - 长度字段字节数。
     *        其实也就是在长度字段和在内容字段之间的字段所占的字节数。
     * @param initialBytesToStrip
     *        丢弃的起始字节数。其实这个就是内容字段的偏移量。
     */
    public LengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip) {
        // 这后面还有第六个参数failFast,在这个构造函数中设置为true了。
        // 这个参数设置为true的含义是:一旦帧(数据包)的长度将要超过最大长度,不过帧是否完整,都将立即抛出TooLongFrameException
        this(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment,initialBytesToStrip, true); 
    }
复制代码

下面用这个解码器来重新写上面的测试案例,并在协议头加入了协议版本号。

public class TestDecoder {
    private static final int VERSION = 100; // 版本号
    public static void main(String[] args){
        try{
            // 构造LengthFieldBasedFrameDecoder
            // 这里设置最大字节数为1024
            // 长度字段的偏移量为0,因为写的时候将长度字段第一个写入的,所以第一个字段就是长度字段
            // 长度字段所在字节数为4,因为类型是int
            // 长度的矫正值为2。因为在长度字段和内容字段之间夹了一个协议版本号,写入类型为char,占2个字节。
            // 丢弃的起始字节数。长度字段字节数 + 协议版本号字节数 = 内容字段的偏移量 = 6,这些都是要丢弃的。
            final LengthFieldBasedFrameDecoder spilter = new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6);
            ChannelInitializer channelInitializer = new ChannelInitializer<EmbeddedChannel>(){
                @Override
                protected void initChannel(EmbeddedChannel ch) {
                    ch.pipeline()
                            // 将刚刚初始化的解码器放进通道流水线中,由此可知,该解码器是可以通道间共享的
                            .addLast(spilter)
                            // 这个解码器不是刚刚自己写的,而是Netty内置的io.netty.handler.codec.string.StringDecoder
                            .addLast(new StringDecoder())
                            .addLast(new StringProcessHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(channelInitializer);
            for (int i = 1; i <= 3; ++i){
                ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
                String s = i + "次发送:Offer来了" ;
                byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
                // 先写消息内容的长度
                buf.writeInt(bytes.length);
                // 再写版本号
                buf.writeChar(VERSION);
                // 最后写消息内容
                buf.writeBytes(bytes);
                channel.writeInbound(buf);
            }
            Thread.sleep(Integer.MAX_VALUE);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
复制代码

运行结果与上面的也是一样的。

Encode原理与使用

编码器与解码器是非常类似的,就是名字不同,内部实现的逻辑不同,使用方法是差不多的。

原理就没啥好说的了,直接来个实例吧。

使用

在说解码器的时候提到了MessageToMessageDecoder,但没有在案例中使用,在这里用的 MessageToMessageEncoderMessageToMessageDecoder 用法是类似的。

StringToInteger 将字符串编码为整型。

public class StringToInteger extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {
        // 先把字符串转为字符数组
        char[] array = msg.toCharArray();
        Log.info("原数据:[{}]", msg);
        // 遍历字符数组,取出表示的是数字的字符,编码为整型
        for (char c : array){
            // 48是0的ASCII码,57是9的ASCII码
            if (c >= 48 && c <= 57){
                Log.info("整型数据:[{}]", c);
                out.add(new Integer(c));
            }
        }
    }
}
复制代码

将整型编码为ByteBuf二进制数据,这里需要指定传入的数据类型,这里是整型,所以指定 MessageToByteEncoder 的泛型为Integer。

public class IntegerEncoder extends MessageToByteEncoder<Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
        // 将整型数据输入写入ByteBuf中
        out.writeInt(msg);
    }
}
复制代码

测试案例,还是使用EmbeddedChannel通道来进行出站测试。

public class TestEncoder {
    public static void main(String[] args){
        ChannelInitializer channelInitializer = new ChannelInitializer() {
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline()
                        // 出站处理器是从后往前处理,所以这里先转整型,再转ByteBuf
                        .addLast(new IntegerEncoder())
                        .addLast(new StringToInteger());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(channelInitializer);
        // 向通道中写入一些数据
        for (int i = 1;i <= 3; ++i){
            // 经过出站编码处理,除整型外的字符将被丢弃
            channel.write("I am" + i);
        }
        channel.flush();
        // 获取出站数据
        ByteBuf buf = channel.readOutbound();
        while(null != buf){
            // 将出站数据按整型读出
            Log.info("出站消息: [{}]", buf.readInt());
            buf = channel.readOutbound();
        }
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码

运行结果,只有数字出站(输出的是数字字符对应的ASCII码),字符被丢弃。

14:49:57.047 [main] INFO encode.StringToInteger - 原数据:[I am 1]
14:49:57.048 [main] INFO encode.StringToInteger - 整型数据:[1]
14:49:57.111 [main] INFO encode.StringToInteger - 原数据:[I am 2]
14:49:57.111 [main] INFO encode.StringToInteger - 整型数据:[2]
14:49:57.111 [main] INFO encode.StringToInteger - 原数据:[I am 3]
14:49:57.111 [main] INFO encode.StringToInteger - 整型数据:[3]
14:49:57.112 [main] INFO encode.TestEncoder - 出站消息: [49]
14:49:57.113 [main] INFO encode.TestEncoder - 出站消息: [50]
14:49:57.113 [main] INFO encode.TestEncoder - 出站消息: [51]
复制代码
原文  https://juejin.im/post/5e0af0c8f265da5d1a446968
正文到此结束
Loading...