转载

高性能 Netty 之编解码技术

什么是编解码技术?我们先来看一下关于这个词的简单描述。

编码是信息从一种形式或格式转换为另一种形式的过程也称为计算机编程语言的代码简称编码。而解码指的是编码的逆过程。

从描述我们可以知道,其实编解码技术是一个泛指。你可以认为是密码学里面的加密和解密,也可以认为是 Java 里面的序列化技术(对象->二进制/二进制->对象),或等等。所以说编解码技术其实应用非常的广泛。

而我们这次要讲的是,编解码技术在 Netty 之中的运用。我在文章 [高性能 Netty 之初体验] 说过, Netty 应用比较广泛的是作为一个通讯组件嵌套到别的框架当中,例如 高性能优秀的服务框架 Dubbo 或者 消息中间件 Rocket ,或更多著名开源框架例如 Spring 。所以 Netty 在编解码技术上的优势肯定也会有比较合适的技术选型值得我们学习的。

编解码的挑选要求

一个好的编解码技术,考虑的方面还是挺多的。例如

跨平台,跨语言
高性能/高扩展的存储格式
高性能

首先第一条, 跨平台 不用说了,因为服务器可以部署在不同的机器上。然后目前在分布式或微服务当中, RPC (远程调用)遇到的问题就是 跨语言 。很大程度在技术选型,语言选型上都是交叉,为得就是能够将各个语言的优势最大化。所以跨语言是比较重要的;

第二是 高扩展 。高扩展主要体现在传输的数据格式。在传输数据的过程中,我们的程序不可能永远限制于固定数量的参数。但是有个问题是,当数据在编解码的过程中,一旦出现了新添加的内容,很可能编解码的结果和原本的结果并不一致,这样导致接收方接收解码后会有问题。

第三个, 高性能 。高性能无非就是编码后数据包越小越好。 越大的数据包那么占用更多的带宽,存储的硬件成本高,解码的压力也会较大 。如果是在高并发的场景之下,二进制编码可以减少空间的开销,而 XML 等可读性高但是占用空间大的数据结构反而使用于低频次系统交互的场景。

编解码的挑选方案

在 Netty 实现编解码的话,主要有几种方案

JDK 自带的序列化
Google Protobuf
Facebook 的 Thrift
JBOSS Marshalling

下面我们会使用 JDK 自带序列化Google Protobuf 进行举例。由于代码可能整合过长,所以会另外用一篇文章去记录 Google Protobuf 与 Netty 的整合文章。

JDK 自带序列化

[首先说明,此例子与之前的文章的代码大部分相同,所以可以参考一下不同之处]

由于我们需要使用 JDK 自带序列化,那么首先依旧,我们需要分别开发服务端和客户端。 首先,我们依旧创建一个客户端的发送请求的实体类以及服务端应答的实体类。

SubscribeReq.java 请求实体类

public class SubscribeReq implements Serializable {    
        /**     * 默认的序列号ID     */    
        private static final long serialVersionUID = 1L;   
        private int subReqID;           // 消息标识
        private String userName;      //用户名称
        private String productName; //产品名称   
        private String phoneNumber;//电话号码    
        private String address;         //地址
        
        // .... 省略 getter 和 setter
}
复制代码

接下来是服务器应答类

import java.io.Serializable;

public class SubscribeResp implements Serializable {
    /*** 默认序列ID*/
    private static final long serialVersionUID = 1L;
    private int subReqID;   //消息名称
    private int respCode;   //请求结果码
    private String desc;    // 描述
   // .... 省略 getter 和 setter
复制代码

服务端

上面有了应答类,我们可以使用实体类来进行交互了。

首先我们编写服务端 SubReqServer.java

public class SubReqServer {
    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //服务端启动配置类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                    public void initChannel(SocketChannel ch) {
                        // 这个是 Netty 对象解码
                        ch.pipeline().addLast(
                                new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this .getClass().getClassLoader())));
                        // 这个是 Netty 对象编码器
                        ch.pipeline().addLast(new ObjectEncoder());
                        ch.pipeline().addLast(new SubReqServerHandler());
                    }
                });
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new SubReqServer().bind(8080);
    }
}
复制代码

然后我们编写服务器的处理类 SubReqServerHandler

SubReqServerHandler.java

@Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception {
	//由于我们上面使用了解码器,所以这里可以直接解码成为 SubscribeReq
	SubscribeReq req = (SubscribeReq) msg;
	//如果等于“JAVA”,就输出结果
	if ("JAVA".equalsIgnoreCase(req.getUserName())) {
	    System.out.println("Service accept client subscrib req : ["
		    + req.toString() + "]");
		//Netty刷新应答方法回去
	    ctx.writeAndFlush(resp(req.getSubReqID()));
	}
    }
    //应答方法
    private SubscribeResp resp(int subReqID) {
	SubscribeResp resp = new SubscribeResp();
	resp.setSubReqID(subReqID);
	resp.setRespCode(0);
	resp.setDesc("Java Book is exited!!!");
	return resp;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
	cause.printStackTrace();
	   ctx.close();// 发生异常,关闭链路
    }
}
复制代码

客户端

客户端的话,依旧是启动一个新的端口去访问服务端。

SubReqClient.java

public class SubReqClient {
    public void connect(int port, String host) throws Exception {
	// 配置客户端NIO线程组
	EventLoopGroup group = new NioEventLoopGroup();
	try {
	    Bootstrap b = new Bootstrap();
	    b.group(group).channel(NioSocketChannel.class)
		    .option(ChannelOption.TCP_NODELAY, true)
		    .handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch)
				throws Exception {
				//客户端也依旧使用 Netty 的编解码器
			    ch.pipeline().addLast(
				    new ObjectDecoder(1024, ClassResolvers
					    .cacheDisabled(this.getClass()
						    .getClassLoader())));
			    ch.pipeline().addLast(new ObjectEncoder());
			    ch.pipeline().addLast(new SubReqClientHandler());
			}
		    });

	    // 发起异步连接操作
	    ChannelFuture f = b.connect(host, port).sync();

	    // 当代客户端链路关闭
	    f.channel().closeFuture().sync();
	} finally {
	    // 优雅退出,释放NIO线程组
	    group.shutdownGracefully();
	}
    }

    public static void main(String[] args) throws Exception {
	new SubReqClient().connect(8080, "127.0.0.1");
    }
}
复制代码

SubReqClientHandler.java 是负责逻辑处理

public class SubReqClientHandler extends ChannelHandlerAdapter {

    public SubReqClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
	for (int i = 0; i < 10; i++) {
	    ctx.write(subReq(i));
	}
	ctx.flush();
    }

    private SubscribeReq subReq(int i) {
	SubscribeReq req = new SubscribeReq();
	req.setAddress("地址");
	req.setPhoneNumber("110");
	req.setProductName("Java");
	req.setSubReqID(i);
	req.setUserName("Java");
	return req;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception {
	System.out.println("Receive server response : [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
	ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
	cause.printStackTrace();
	ctx.close();
    }
}
复制代码

Java 序列化的缺点

我们上面使用了一个简单的例子就可以实现了以对象的方式来编解码传输数据。但是虽然非常便捷,但是带来的问题也是非常明显的。我们上面说过,选择一个编解码方式需要考虑跨语言。而 Java 序列化由于是 Java 语言内部的私有协议,其他语言根本不支持这种协议。同时,Java 序列化的字节数组,别的语言无法反序列化。所以成为了目前流行的 RPC 框架几乎不会选择使用 Java 序列化的方式来作为编解码框架的主要原因。

原文  https://juejin.im/post/5efb5531f265da22b5445b31
正文到此结束
Loading...