这里只是列出了你能从RPC框架源码中能学到的东西,本文并不会每个知识点都点到,主要讲述如何手写一个RPC框架,更多细节需要读者阅读源码,文章的下方会提供源码链接哦。
Remote Procedure Call(RPC):远程过程调用。
过程是什么?
过程就是业务处理、计算任务,更直白理解,就是程序。(像调用本地方法一样调用远程的过程。)
RPC采用Client-Server结构,通过Request-Response消息模式实现。
调用 编组 发送 传递 解组 再调用
RPC调用过程中需要将参数编组为消息进行发送,接受方需要解组消息为参数,过程处理结果同样需要经编组、解组。消息由哪些部分构成及消息的表示形式就构成了消息协议。
RPC协议规定请求、响应消息的格式
在TCP(网络传输控制协议)上可选用或自定义消息协议来完成RPC消息交互
我们可以选用通用的标准协议(如:http、https),也也可根据自身的需要定义自己的消息协议。
封装好参数编组、消息解组、底层网络通信的RPC程序开发框架,带来的便捷是可以直接在其基础上只需要专注于过程代码编写。
Java领域:
我们将会写一个简易的RPC框架,暂且叫它 leisure-rpc-spring-boot-starter
,通过在项目中引入该starter,并简单的配置一下,项目即拥有提供远程服务的能力。
编写自定义注解 @Service
,被它注解的类将会提供远程服务。
编写自定义注解 @InjectService
,使用它可注入远程服务。
客户端想要调用远程服务,必须具备 服务发现 的能力;在知道有哪些服务过后,还必须有 服务代理 来执行服务调用;客户端想要与服务端通信,必须要有相同的 消息协议 ;客户端想要调用远程服务,那么必须具备网络请求的能力,即 网络层 功能。
当然,这是客户端所需的最基本的能力,其实还可以扩展的能力,例如负载均衡。
我们先看看客户端的代码结构:
基于面向接口编程的理念,不同角色都实现了定义了相应规范的接口。这里面我们没有发现消息协议相关内容,那是因为服务端也需要消息协议,因此抽离了出来,放在公共层。
/** * 服务发现抽象类,定义服务发现规范 * * @author 东方雨倾 * @since 1.0.0 */ public interface ServiceDiscoverer { List<Service> getServices(String name); } /** * Zookeeper服务发现者,定义以Zookeeper为注册中心的服务发现细则 * * @author 东方雨倾 * @since 1.0.0 */ public class ZookeeperServiceDiscoverer implements ServiceDiscoverer { private ZkClient zkClient; public ZookeeperServiceDiscoverer(String zkAddress) { zkClient = new ZkClient(zkAddress); zkClient.setZkSerializer(new ZookeeperSerializer()); } /** * 使用Zookeeper客户端,通过服务名获取服务列表 * 服务名格式:接口全路径 * * @param name 服务名 * @return 服务列表 */ @Override public List<Service> getServices(String name) { String servicePath = LeisureConstant.ZK_SERVICE_PATH + LeisureConstant.PATH_DELIMITER + name + "/service"; List<String> children = zkClient.getChildren(servicePath); return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> { String deCh = null; try { deCh = URLDecoder.decode(str, LeisureConstant.UTF_8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return JSON.parseObject(deCh, Service.class); }).collect(Collectors.toList()); } }
服务发现者使用Zookeeper来实现,通过ZkClient我们很容易发现已经注册在ZK上的服务。当然我们也可以使用其他组件作为注册中心,例如Redis。
/** * 网络请求客户端,定义网络请求规范 * * @author 东方雨倾 * @since 1.0.0 */ public interface NetClient { byte[] sendRequest(byte[] data, Service service) throws InterruptedException; } /** * Netty网络请求客户端,定义通过Netty实现网络请求的细则。 * * @author 东方雨倾 * @since 1.0.0 */ public class NettyNetClient implements NetClient { private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class); /** * 发送请求 * * @param data 请求数据 * @param service 服务信息 * @return 响应数据 * @throws InterruptedException 异常 */ @Override public byte[] sendRequest(byte[] data, Service service) throws InterruptedException { String[] addInfoArray = service.getAddress().split(":"); String serverAddress = addInfoArray[0]; String serverPort = addInfoArray[1]; SendHandler sendHandler = new SendHandler(data); byte[] respData; // 配置客户端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(sendHandler); } }); // 启动客户端连接 b.connect(serverAddress, Integer.parseInt(serverPort)).sync(); respData = (byte[]) sendHandler.rspData(); logger.info("SendRequest get reply: {}", respData); } finally { // 释放线程组资源 group.shutdownGracefully(); } return respData; } } /** * 发送处理类,定义Netty入站处理细则 * * @author 东方雨倾 * @since 1.0.0 */ public class SendHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(SendHandler.class); private CountDownLatch cdl; private Object readMsg = null; private byte[] data; public SendHandler(byte[] data) { cdl = new CountDownLatch(1); this.data = data; } /** * 当连接服务端成功后,发送请求数据 * * @param ctx 通道上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) { logger.info("Successful connection to server:{}", ctx); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); logger.info("Client sends message:{}", reqBuf); ctx.writeAndFlush(reqBuf); } /** * 读取数据,数据读取完毕释放CD锁 * * @param ctx 上下文 * @param msg ByteBuf */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { logger.info("Client reads message: {}", msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(resp); readMsg = resp; cdl.countDown(); } /** * 等待读取数据完成 * * @return 响应数据 * @throws InterruptedException 异常 */ public Object rspData() throws InterruptedException { cdl.await(); return readMsg; } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("Exception occurred:{}", cause.getMessage()); ctx.close(); } }
在这里我们使用Netty来实现网络请求客户端,当然也可以使用Mina。网络请求客户端能连接远程服务端,并将编组好的请求数据发送给服务端,待服务端处理好后,又将服务端的响应数据返回给客户端。
/** * 客户端代理工厂:用于创建远程服务代理类 * 封装编组请求、请求发送、编组响应等操作。 * * @author 东方雨倾 * @since 1.0.0 */ public class ClientProxyFactory { private ServiceDiscoverer serviceDiscoverer; private Map<String, MessageProtocol> supportMessageProtocols; private NetClient netClient; private Map<Class<?>, Object> objectCache = new HashMap<>(); /** * 通过Java动态代理获取服务代理类 * * @param clazz 被代理类Class * @param <T> 泛型 * @return 服务代理类 */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T) this.objectCache.computeIfAbsent(clazz, cls -> newProxyInstance(cls.getClassLoader(), new Class<?>[]{cls}, new ClientInvocationHandler(cls))); } // getter setter ... /** * 客户端服务代理类invoke函数细节实现 */ private class ClientInvocationHandler implements InvocationHandler { private Class<?> clazz; private Random random = new Random(); public ClientInvocationHandler(Class<?> clazz) { super(); this.clazz = clazz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Exception { if (method.getName().equals("toString")) { return proxy.getClass().toString(); } if (method.getName().equals("hashCode")) { return 0; } // 1、获得服务信息 String serviceName = this.clazz.getName(); List<Service> services = serviceDiscoverer.getServices(serviceName); if (services == null || services.isEmpty()) { throw new LeisureException("No provider available!"); } // 随机选择一个服务提供者(软负载均衡) Service service = services.get(random.nextInt(services.size())); // 2、构造request对象 LeisureRequest req = new LeisureRequest(); req.setServiceName(service.getName()); req.setMethod(method.getName()); req.setParameterTypes(method.getParameterTypes()); req.setParameters(args); // 3、协议层编组 // 获得该方法对应的协议 MessageProtocol protocol = supportMessageProtocols.get(service.getProtocol()); // 编组请求 byte[] data = protocol.marshallingRequest(req); // 4、调用网络层发送请求 byte[] repData = netClient.sendRequest(data, service); // 5解组响应消息 LeisureResponse rsp = protocol.unmarshallingResponse(repData); // 6、结果处理 if (rsp.getException() != null) { throw rsp.getException(); } return rsp.getReturnValue(); } } }
服务代理类由客户端代理工厂类产生,代理方式是基于Java的动态代理。在处理类ClientInvocationHandler的invoke函数中,定义了一系列的操作,包括获取服务、选择服务提供者、构造请求对象、编组请求对象、网络请求客户端发送请求、解组响应消息、异常处理等。
/** * 消息协议,定义编组请求、解组请求、编组响应、解组响应规范 * * @author 东方雨倾 * @since 1.0.0 */ public interface MessageProtocol { /** * 编组请求 * * @param req 请求信息 * @return 请求字节数组 * @throws Exception 编组请求异常 */ byte[] marshallingRequest(LeisureRequest req) throws Exception; /** * 解组请求 * * @param data 请求字节数组 * @return 请求信息 * @throws Exception 解组请求异常 */ LeisureRequest unmarshallingRequest(byte[] data) throws Exception; /** * 编组响应 * * @param rsp 响应信息 * @return 响应字节数组 * @throws Exception 编组响应异常 */ byte[] marshallingResponse(LeisureResponse rsp) throws Exception; /** * 解组响应 * * @param data 响应字节数组 * @return 响应信息 * @throws Exception 解组响应异常 */ LeisureResponse unmarshallingResponse(byte[] data) throws Exception; } /** * Java序列化消息协议 * * @author 东方雨倾 * @since 1.0.0 */ public class JavaSerializeMessageProtocol implements MessageProtocol { private byte[] serialize(Object obj) throws Exception { ByteArrayOutputStream bout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bout); out.writeObject(obj); return bout.toByteArray(); } @Override public byte[] marshallingRequest(LeisureRequest req) throws Exception { return this.serialize(req); } @Override public LeisureRequest unmarshallingRequest(byte[] data) throws Exception { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data)); return (LeisureRequest) in.readObject(); } @Override public byte[] marshallingResponse(LeisureResponse rsp) throws Exception { return this.serialize(rsp); } @Override public LeisureResponse unmarshallingResponse(byte[] data) throws Exception { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data)); return (LeisureResponse) in.readObject(); } }
消息协议主要是定义了客户端如何 编组请求 、 解组响应 ,服务端如何 解组请求 、 编组响应 这四个操作规范。本文提供了Java序列化与反序列化的实现,感兴趣的读者可以基于其他序列化技术实现其他消息协议(偷偷说一句:Java的序列化性能很不理想)。
首先,服务端要提供远程服务,必须具备 服务注册及暴露 的能力;在这之后,还需要开启 网络服务 ,供客户端连接。有些项目可能既是服务提供者,又是服务消费者,那什么时候开启服务,什么时候注入服务呢?这里我们引入一个 RPC处理者 的概念,由它来帮我们开启服务,以及注入服务。
先看看服务端的代码结构:
服务端做的事情也很简单,注册服务并暴露服务,然后开启网络服务;如果服务端也是消费者,则注入远程服务。
服务注册和服务注入依赖两个自定义注解来实现:
下面是他们的实现代码:
/** * 被该注解标记的服务可提供远程RPC访问的能力 * * @author 东方雨倾 * @since 1.0.0 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface Service { String value() default ""; } /** * 该注解用于注入远程服务 * * @author 东方雨倾 * @since 1.0.0 */ @Target({ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface InjectService { }
/** * 服务注册器,定义服务注册规范 * * @author 东方雨倾 * @since 1.0.0 */ public interface ServiceRegister { void register(ServiceObject so) throws Exception; ServiceObject getServiceObject(String name) throws Exception; } /** * 默认服务注册器 * * @author 东方雨倾 * @since 1.0.0 */ public class DefaultServiceRegister implements ServiceRegister { private Map<String, ServiceObject> serviceMap = new HashMap<>(); protected String protocol; protected Integer port; @Override public void register(ServiceObject so) throws Exception { if (so == null) { throw new IllegalArgumentException("Parameter cannot be empty."); } this.serviceMap.put(so.getName(), so); } @Override public ServiceObject getServiceObject(String name) { return this.serviceMap.get(name); } } /** * Zookeeper服务注册器,提供服务注册、服务暴露的能力 * * @author 东方雨倾 * @since 1.0.0 */ public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister { /** * Zk客户端 */ private ZkClient client; public ZookeeperExportServiceRegister(String zkAddress, Integer port, String protocol) { client = new ZkClient(zkAddress); client.setZkSerializer(new ZookeeperSerializer()); this.port = port; this.protocol = protocol; } /** * 服务注册 * * @param so 服务持有者 * @throws Exception 注册异常 */ @Override public void register(ServiceObject so) throws Exception { super.register(so); Service service = new Service(); String host = InetAddress.getLocalHost().getHostAddress(); String address = host + ":" + port; service.setAddress(address); service.setName(so.getClazz().getName()); service.setProtocol(protocol); this.exportService(service); } /** * 服务暴露 * * @param serviceResource 需要暴露的服务信息 */ private void exportService(Service serviceResource) { String serviceName = serviceResource.getName(); String uri = JSON.toJSONString(serviceResource); try { uri = URLEncoder.encode(uri, UTF_8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = ZK_SERVICE_PATH + PATH_DELIMITER + serviceName + "/service"; if (!client.exists(servicePath)) { client.createPersistent(servicePath, true); } String uriPath = servicePath + PATH_DELIMITER + uri; if (client.exists(uriPath)) { client.delete(uriPath); } client.createEphemeral(uriPath); } }
这个过程其实没啥好说的,就是将指定ServiceObject对象序列化后保存到ZK上,供客户端发现。同时会将服务对象缓存起来,在客户端调用服务时,通过缓存的ServiceObject对象反射指定服务,调用方法。
/** * RPC服务端抽象类 * * @author 东方雨倾 * @since 1.0.0 */ public abstract class RpcServer { /** * 服务端口 */ protected int port; /** * 服务协议 */ protected String protocol; /** * 请求处理者 */ protected RequestHandler handler; public RpcServer(int port, String protocol, RequestHandler handler) { super(); this.port = port; this.protocol = protocol; this.handler = handler; } /** * 开启服务 */ public abstract void start(); /** * 停止服务 */ public abstract void stop(); // getter setter ... } /** * Netty RPC服务端,提供Netty网络服务开启、关闭的能力 * * @author 东方雨倾 * @since 1.0.0 */ public class NettyRpcServer extends RpcServer { private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class); private Channel channel; public NettyRpcServer(int port, String protocol, RequestHandler handler) { super(port, protocol, handler); } @Override public void start() { // 配置服务器 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new ChannelRequestHandler()); } }); // 启动服务 ChannelFuture f = b.bind(port).sync(); logger.info("Server started successfully."); channel = f.channel(); // 等待服务通道关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 释放线程组资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void stop() { this.channel.close(); } private class ChannelRequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { logger.info("Channel active:{}", ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("The server receives a message: {}", msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] req = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(req); byte[] res = handler.handleRequest(req); logger.info("Send response:{}", msg); ByteBuf respBuf = Unpooled.buffer(res.length); respBuf.writeBytes(res); ctx.write(respBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("Exception occurred:{}", cause.getMessage()); ctx.close(); } } } /** * 请求处理者,提供解组请求、编组响应等操作 * * @author 东方雨倾 * @since 1.0.0 */ public class RequestHandler { private MessageProtocol protocol; private ServiceRegister serviceRegister; public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) { super(); this.protocol = protocol; this.serviceRegister = serviceRegister; } public byte[] handleRequest(byte[] data) throws Exception { // 1、解组消息 LeisureRequest req = this.protocol.unmarshallingRequest(data); // 2、查找服务对象 ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName()); LeisureResponse rsp = null; if (so == null) { rsp = new LeisureResponse(LeisureStatus.NOT_FOUND); } else { // 3、反射调用对应的过程方法 try { Method m = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes()); Object returnValue = m.invoke(so.getObj(), req.getParameters()); rsp = new LeisureResponse(LeisureStatus.SUCCESS); rsp.setReturnValue(returnValue); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { rsp = new LeisureResponse(LeisureStatus.ERROR); rsp.setException(e); } } // 4、编组响应消息 return this.protocol.marshallingResponse(rsp); } // getter setter ... }
网络服务定义了启动服务的细则,以及如何处理客户端发来的请求。
/** * RPC处理者,支持服务启动暴露、自动注入Service * * @author 东方雨倾 * @since 1.0.0 */ public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent> { @Resource private ClientProxyFactory clientProxyFactory; @Resource private ServiceRegister serviceRegister; @Resource private RpcServer rpcServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (Objects.isNull(event.getApplicationContext().getParent())) { ApplicationContext context = event.getApplicationContext(); // 开启服务 startServer(context); // 注入Service injectService(context); } } private void startServer(ApplicationContext context) { Map<String, Object> beans = context.getBeansWithAnnotation(Service.class); if (beans.size() != 0) { boolean startServerFlag = true; for (Object obj : beans.values()) { try { Class<?> clazz = obj.getClass(); Class<?>[] interfaces = clazz.getInterfaces(); ServiceObject so; if (interfaces.length != 1) { Service service = clazz.getAnnotation(Service.class); String value = service.value(); if (value.equals("")) { startServerFlag = false; throw new UnsupportedOperationException("The exposed interface is not specific with '" + obj.getClass().getName() + "'"); } so = new ServiceObject(value, Class.forName(value), obj); } else { Class<?> superClass = interfaces[0]; so = new ServiceObject(superClass.getName(), superClass, obj); } serviceRegister.register(so); } catch (Exception e) { e.printStackTrace(); } } if (startServerFlag) { rpcServer.start(); } } } private void injectService(ApplicationContext context) { String[] names = context.getBeanDefinitionNames(); for (String name : names) { Class<?> clazz = context.getType(name); if (Objects.isNull(clazz)) continue; Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { InjectService injectLeisure = field.getAnnotation(InjectService.class); if (Objects.isNull(injectLeisure)) continue; Class<?> fieldClass = field.getType(); Object object = context.getBean(name); field.setAccessible(true); try { field.set(object, clientProxyFactory.getProxy(fieldClass)); } catch (IllegalAccessException e) { e.printStackTrace(); } } } } }
DefaultRpcProcessor实现了ApplicationListener,并监听了ContextRefreshedEvent事件,其效果就是在Spring启动完毕过后会收到一个事件通知,基于这个机制,就可以在这里开启服务,以及注入服务。因为一切已经准备就绪了,所需要的资源都是OK的。
框架一个很重要的特性就是要使用简单,使用该框架只需要一个条件和四个步骤即可。
需要准备一个Zookeeper作为注册中心,单节点即可。
引入Maven依赖:
<dependency> <groupId>wang.leisure</groupId> <artifactId>leisure-rpc-spring-boot-starter</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
不知道如何获得依赖的读者,请在源码下载后,进入项目目录下(pom.xml文件所在位置),执行 mvn install命令,即可在本地仓库生成maven依赖。
在你的项目配置文件(application.properties)中配置注册中心地址,例如:
leisure.rpc.register-address=192.168.199.241:2181
将你的远程服务使用@Service注解,例如:
import wang.leisure.rpc.annotation.Service; @Service public class UserServiceImpl implements UserService { @Override public ApiResult<User> getUser(Long id) { User user = getFromDbOrCache(id); return ApiResult.success(user); } private User getFromDbOrCache(Long id) { return new User(id, "东方雨倾", 1, "https://leisure.wang"); } }
使用注解@InjectService注入远程服务,例如:
@RestController @RequestMapping("/index/") public class IndexController { @InjectService private UserService userService; /** * 获取用户信息 * http://localhost:8080/index/getUser?id=1 * * @param id 用户id * @return 用户信息 */ @GetMapping("getUser") public ApiResult<User> getUser(Long id) { return userService.getUser(id); } }
框架源码:leisure-rpc-spring-boot-starter
示例源码:leisure-rpc-example
为方便读者看到效果,笔者也简单的编写了一个示例项目,可以下载下来试试。如果源码对你有一丁点的帮助,希望点个小星星支持一下哦。
希望读者能够真正动手去试一试,只有实践了才能知道里面的运作逻辑。笔者也是花了两个星期才把代码跟文章整理好,并不是因为这个东西难,而是因为没时间,苦逼的程序 早上七点起床,晚上10点左右回家,确实没啥时间搞这些,哈哈哈。如果文章对你有帮助,希望多多支持。
原文地址: https://leisure.wang/procedural-framework/framework/704.html
很遗憾的说,推酷将在这个月底关闭。人生海海,几度秋凉,感谢那些有你的时光。
原文 http://www.cnblogs.com/itoak/p/13370031.html