中,我们通过 BIO 去实现了一个简单的 RPC 通信案例,今天我们将会基于上次的案例进行改造,将通信由 BIO 改造为 NIO。
说到 NIO,就需要一起聊聊它的俩兄弟,BIO 以及 AIO。
BIO:Blocking IO,同步阻塞 IO。简单的说就是 服务端创建一个 ServerSocket,客户端通过 Socket 去连接 ServerSocket,服务端会通过 accept 去阻塞等待客户端的请求
。BIO 存在一个十分明显的问题,就是每当存在一个客户端连接服务端时,服务端都会启动一个线程,专门的去服务这个客户端,这将导致如果出现大量的客户端连接时,服务端将会产生大量的线程开销,导致崩溃。( 因此,在上一篇文章中,我们在服务端使用一个线程异步的去处理这些请求,尽量增大服务端每个线程可以处理的请求数,来达到略微优化的目的 。)
NIO:NonBlocking IO,同步非阻塞 IO。在 Java 1.4 中引入,NIO 是基于 Reactor 模型。NIO 中引入了 Selector、Channel、Buffer 等概念。Buffer(缓冲区)主要用来存、取数据,Channel(通道)一个 Channel 对应一个客户端,Selector(调度器)不断轮询 Channel,处理 Channel 发生的事件。在 NIO 模型中,只有当客户端发生一次请求时,才会创建一个线程,请求结束,线程销毁,相比于 BIO 模型一个线程一直阻塞等待一个客户端所有请求而言,减少了线程的开销。NIO 的核心是同步非阻塞,无论多少客户端都可以接入服务端,客户端接入并不会耗费一个线程,只会创建一个连接然后注册到 Selector 上去,一个 Selector 线程不断的轮询所有的 Socket 连接,发现有新的事件就触发通知,然后启动一个线程专门处理一个请求即可,处理完毕释放线程,但是这个处理的过程中,要先读取数据、处理数据、再返回数据,这是个同步的过程。
AIO:Asynchronous NonBlocking IO,异步非阻塞 IO。AIO 是基于 Proactor 模型的。在 NIO 模型中,工作线程从 Channel 中读写数据是同步的。但是在 AIO 中,每个连接请求会绑定一个 Buffer,然后通过操作系统异步去完成,等操作系统完成之后,会触发一个回调通知你完成了。
在 Java 1.4 就引入了 NIO,但是基于原生的 API 操作比较繁琐,因此本文不会使用原生 API 来重构之前的 RPC 案例。
Netty 是一款基于 NIO 开发的高性能网络通信框架,同时支持自定义通信协议。下图是 Netty 官网的一张 Netty 组件图。
本文将使用 Netty 去重构之前的 RPC 案例。
本次代码是基于 《如何实现一个你自己的 RPC 框架(1)》 进行重构的,因此模块划分保持一致,可以在上一次的基础上进行修改。本文为了凸显区别,模块名均进行修改,由 bio 变更为 nio。
模块划分:
rpc-demo-2 ├── rpc-client-nio // 客户端 ├── rpc-common-nio // 通用模块,封装 RPC 请求参数 └── rpc-server-nio // 服务端 ├── rpc-server-nio-api //接口定义 └── rpc-server-nio-provider // 接口实现
将之前的 ServerSocket 改造为 Netty 的 ServerBootstrap
public class RpcServer { public void start(Object service, int port) { ServerBootstrap serverBootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))) .addLast(new ObjectEncoder()) // 自定义处理器 .addLast(new RpcRequestServerHandler(service)); } }); try { serverBootstrap.bind(port).sync(); System.out.println(service + " 服务发布在 " + port + " 端口"); } catch (InterruptedException e) { e.printStackTrace(); } } }
将原先的 RpcRequestHandler 重命名为 RpcRequestServerHandler,同时实现 SimpleChannelInboundHandler
接口
@AllArgsConstructor public class RpcRequestServerHandler extends SimpleChannelInboundHandler<RpcRequest> { private Object service; @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception { // 反射调用 Object invoke = invoke(rpcRequest); channelHandlerContext.writeAndFlush(invoke).addListener(ChannelFutureListener.CLOSE); } private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { // 请求参数 Object[] params = rpcRequest.getParams(); // 请求参数类型 Class<?>[] paramTypes = new Class[params.length]; for (int i = 0; i < params.length; i++) { paramTypes[i] = params[i].getClass(); } // 获取请求的类名 Class<?> clazz = Class.forName(rpcRequest.getClassName()); // 获取请求的方法名 Method method = clazz.getMethod(rpcRequest.getMethodName(), paramTypes); // 调用 Object result = method.invoke(service, params); return result; } }
此时服务端重构就完成了~
原先的 RpcTransport 是通过 Socket 去连接 ServerSocket,这里改造为 Netty 的 Bootstrap 形式
@AllArgsConstructor public class RpcTransport { private String host; private int port; public Object call(RpcRequest rpcRequest) { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventGroup = new NioEventLoopGroup(); RpcRequestClientHandler rpcRequestClientHandler = new RpcRequestClientHandler(); bootstrap.group(eventGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))) .addLast(new ObjectEncoder()) // 添加自定义处理器 .addLast(rpcRequestClientHandler); } }).option(ChannelOption.TCP_NODELAY, true); try { ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); // 发送 RPC 请求参数 channelFuture.channel().writeAndFlush(rpcRequest).sync(); // 等待连接关闭 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventGroup.shutdownGracefully(); } return rpcRequestClientHandler.getResult(); } }
实现 SimpleChannelInboundHandler
接口,获取服务端返回值
@Getter public class RpcRequestClientHandler extends SimpleChannelInboundHandler<Object> { private Object result; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { this.result = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("调用出现异常:" + cause); ctx.close(); } }
此时客户端也修改完毕~
可以和之前保持一致,这里为了看出区别,我将请求参数由 rpc simple demo
改为 rpc nio demo
先运行 ServerMain 类,查看控制台日志。
com.xkcoding.rpc.nio.HelloServiceImpl@5ab35abe 服务发布在 8080 端口 request is coming: rpc nio demo
再运行 ClientMain 类,查看控制台日志。
需要动态代理生成请求对象 content = hello rpc nio demo
本文主要是将原先的 BIO 实现改造为 NIO 实现,主要是基于 Netty 重构网络通信部分。
但是 Netty 的精髓远不止于此,本文只是粗浅的使用 Hello World 而已。