转载

如何实现一个你自己的 RPC 框架(2)

前言

在上一篇 《如何实现一个你自己的 RPC 框架(1)》

中,我们通过 BIO 去实现了一个简单的 RPC 通信案例,今天我们将会基于上次的案例进行改造,将通信由 BIO 改造为 NIO。

1. 什么是 NIO

说到 NIO,就需要一起聊聊它的俩兄弟,BIO 以及 AIO。

BIO:Blocking IO,同步阻塞 IO。简单的说就是 服务端创建一个 ServerSocket,客户端通过 Socket 去连接 ServerSocket,服务端会通过 accept 去阻塞等待客户端的请求 。BIO 存在一个十分明显的问题,就是每当存在一个客户端连接服务端时,服务端都会启动一个线程,专门的去服务这个客户端,这将导致如果出现大量的客户端连接时,服务端将会产生大量的线程开销,导致崩溃。( 因此,在上一篇文章中,我们在服务端使用一个线程异步的去处理这些请求,尽量增大服务端每个线程可以处理的请求数,来达到略微优化的目的 。)

NIO:NonBlocking IO,同步非阻塞 IO。在 Java 1.4 中引入,NIO 是基于 Reactor 模型。NIO 中引入了 Selector、Channel、Buffer 等概念。Buffer(缓冲区)主要用来存、取数据,Channel(通道)一个 Channel 对应一个客户端,Selector(调度器)不断轮询 Channel,处理 Channel 发生的事件。在 NIO 模型中,只有当客户端发生一次请求时,才会创建一个线程,请求结束,线程销毁,相比于 BIO 模型一个线程一直阻塞等待一个客户端所有请求而言,减少了线程的开销。NIO 的核心是同步非阻塞,无论多少客户端都可以接入服务端,客户端接入并不会耗费一个线程,只会创建一个连接然后注册到 Selector 上去,一个 Selector 线程不断的轮询所有的 Socket 连接,发现有新的事件就触发通知,然后启动一个线程专门处理一个请求即可,处理完毕释放线程,但是这个处理的过程中,要先读取数据、处理数据、再返回数据,这是个同步的过程。

AIO:Asynchronous NonBlocking IO,异步非阻塞 IO。AIO 是基于 Proactor 模型的。在 NIO 模型中,工作线程从 Channel 中读写数据是同步的。但是在 AIO 中,每个连接请求会绑定一个 Buffer,然后通过操作系统异步去完成,等操作系统完成之后,会触发一个回调通知你完成了。

2. 选型

在 Java 1.4 就引入了 NIO,但是基于原生的 API 操作比较繁琐,因此本文不会使用原生 API 来重构之前的 RPC 案例。

Netty 是一款基于 NIO 开发的高性能网络通信框架,同时支持自定义通信协议。下图是 Netty 官网的一张 Netty 组件图。

如何实现一个你自己的 RPC 框架(2)

本文将使用 Netty 去重构之前的 RPC 案例。

3. 实战

本次代码是基于 《如何实现一个你自己的 RPC 框架(1)》 进行重构的,因此模块划分保持一致,可以在上一次的基础上进行修改。本文为了凸显区别,模块名均进行修改,由 bio 变更为 nio。

模块划分:

rpc-demo-2
    ├── rpc-client-nio // 客户端
    ├── rpc-common-nio // 通用模块,封装 RPC 请求参数
    └── rpc-server-nio // 服务端
        ├── rpc-server-nio-api //接口定义
        └── rpc-server-nio-provider // 接口实现

3.1. 修改 RpcServer

将之前的 ServerSocket 改造为 Netty 的 ServerBootstrap

public class RpcServer {

	public void start(Object service, int port) {
		ServerBootstrap serverBootstrap = new ServerBootstrap();

		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel socketChannel) throws Exception {
						socketChannel.pipeline()
								.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
								.addLast(new ObjectEncoder())
								// 自定义处理器
								.addLast(new RpcRequestServerHandler(service));
					}
				});

		try {
			serverBootstrap.bind(port).sync();
			System.out.println(service + " 服务发布在 " + port + " 端口");
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

}

3.2. 修改 RpcRequestHandler

将原先的 RpcRequestHandler 重命名为 RpcRequestServerHandler,同时实现 SimpleChannelInboundHandler 接口

@AllArgsConstructor
public class RpcRequestServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

	private Object service;

	@Override
	protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {
		// 反射调用
		Object invoke = invoke(rpcRequest);
		channelHandlerContext.writeAndFlush(invoke).addListener(ChannelFutureListener.CLOSE);
	}

	private Object invoke(RpcRequest rpcRequest)
			throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
		// 请求参数
		Object[] params = rpcRequest.getParams();
		// 请求参数类型
		Class<?>[] paramTypes = new Class[params.length];

		for (int i = 0; i < params.length; i++) {
			paramTypes[i] = params[i].getClass();
		}

		// 获取请求的类名
		Class<?> clazz = Class.forName(rpcRequest.getClassName());
		// 获取请求的方法名
		Method method = clazz.getMethod(rpcRequest.getMethodName(), paramTypes);

		// 调用
		Object result = method.invoke(service, params);
		return result;
	}

}

此时服务端重构就完成了~

3.3. 修改 RpcTransport

原先的 RpcTransport 是通过 Socket 去连接 ServerSocket,这里改造为 Netty 的 Bootstrap 形式

@AllArgsConstructor
public class RpcTransport {

   private String host;

   private int port;

   public Object call(RpcRequest rpcRequest) {
      Bootstrap bootstrap = new Bootstrap();
      EventLoopGroup eventGroup = new NioEventLoopGroup();

      RpcRequestClientHandler rpcRequestClientHandler = new RpcRequestClientHandler();
      bootstrap.group(eventGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline()
                  .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                  .addLast(new ObjectEncoder())
                        // 添加自定义处理器
                        .addLast(rpcRequestClientHandler);
         }
      }).option(ChannelOption.TCP_NODELAY, true);

      try {
         ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
         // 发送 RPC 请求参数
         channelFuture.channel().writeAndFlush(rpcRequest).sync();
         // 等待连接关闭
         channelFuture.channel().closeFuture().sync();
      }
      catch (InterruptedException e) {
         e.printStackTrace();
      }
      finally {
         eventGroup.shutdownGracefully();
      }

      return rpcRequestClientHandler.getResult();
   }

}

3.4. 创建 RpcRequestClientHandler

实现 SimpleChannelInboundHandler 接口,获取服务端返回值

@Getter
public class RpcRequestClientHandler extends SimpleChannelInboundHandler<Object> {

   private Object result;

   @Override
   protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
      this.result = msg;
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      System.out.println("调用出现异常:" + cause);
      ctx.close();
   }

}

此时客户端也修改完毕~

3.5. 测试

可以和之前保持一致,这里为了看出区别,我将请求参数由 rpc simple demo 改为 rpc nio demo

先运行 ServerMain 类,查看控制台日志。

com.xkcoding.rpc.nio.HelloServiceImpl@5ab35abe 服务发布在 8080 端口
request is coming: rpc nio demo

再运行 ClientMain 类,查看控制台日志。

需要动态代理生成请求对象
content = hello rpc nio demo

4. 总结

本文主要是将原先的 BIO 实现改造为 NIO 实现,主要是基于 Netty 重构网络通信部分。

但是 Netty 的精髓远不止于此,本文只是粗浅的使用 Hello World 而已。

原文  https://xkcoding.com/2020/05/12/how-to-design-your-own-rpc-framework-2.html
正文到此结束
Loading...