在微服务大行其道的今天,分布式系统越来越重要,实现服务化首先就要考虑服务之间的通信问题。这里面涉及序列化、反序列化、寻址、连接等等问题。。不过,有了RPC框架,我们就无需苦恼。
RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。
值得注意是,两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样。
RPC框架有很多,比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift。当然了,还有Spring Cloud,不过对于Spring Cloud来说,RPC只是它的一个功能模块。
复杂的先不讲,如果要实现一个基本功能、简单的RPC,要涉及哪些东西呢?
下面我们一起通过代码来分析,怎么把这些技术点串到一起,实现我们自己的RPC。
在开始之前,笔者先介绍一下所用到的软件环境。
SpringBoot、Netty、zookeeper、zkclient、fastjson
整个RPC,我们分为生产者和消费者。首先它们有一个共同的服务接口API。在这里,我们搞一个操作用户信息的service接口。
public interface InfoUserService { List<InfoUser> insertInfoUser(InfoUser infoUser); InfoUser getInfoUserById(String id); void deleteInfoUserById(String id); String getNameById(String id); Map<String,InfoUser> getAllUser(); } 复制代码
作为生产者,它当然要有实现类,我们创建InfoUserServiceImpl实现类,并用注解把它标注为RPC的服务,然后注册到Spring的Bean容器中。在这里,我们把infoUserMap当做数据库,存储用户信息。
package com.viewscenes.netsupervisor.service.impl; @RpcService public class InfoUserServiceImpl implements InfoUserService { Logger logger = LoggerFactory.getLogger(this.getClass()); //当做数据库,存储用户信息 Map<String,InfoUser> infoUserMap = new HashMap<>(); public List<InfoUser> insertInfoUser(InfoUser infoUser) { logger.info("新增用户信息:{}", JSONObject.toJSONString(infoUser)); infoUserMap.put(infoUser.getId(),infoUser); return getInfoUserList(); } public InfoUser getInfoUserById(String id) { InfoUser infoUser = infoUserMap.get(id); logger.info("查询用户ID:{}",id); return infoUser; } public List<InfoUser> getInfoUserList() { List<InfoUser> userList = new ArrayList<>(); Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator(); while (iterator.hasNext()){ Map.Entry<String, InfoUser> next = iterator.next(); userList.add(next.getValue()); } logger.info("返回用户信息记录数:{}",userList.size()); return userList; } public void deleteInfoUserById(String id) { logger.info("删除用户信息:{}",JSONObject.toJSONString(infoUserMap.remove(id))); } public String getNameById(String id){ logger.info("根据ID查询用户名称:{}",id); return infoUserMap.get(id).getName(); } public Map<String,InfoUser> getAllUser(){ logger.info("查询所有用户信息{}",infoUserMap.keySet().size()); return infoUserMap; } } 复制代码
元注解定义如下:
package com.viewscenes.netsupervisor.annotation; @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService {} 复制代码
所有的请求信息和返回信息,我们用两个JavaBean来表示。其中的重点是,返回信息要带有请求信息的ID。
package com.viewscenes.netsupervisor.entity; public class Request { private String id; private String className;// 类名 private String methodName;// 函数名称 private Class<?>[] parameterTypes;// 参数类型 private Object[] parameters;// 参数列表 get/set ... } 复制代码
package com.viewscenes.netsupervisor.entity; public class Response { private String requestId; private int code; private String error_msg; private Object data; get/set ... } 复制代码
Netty作为高性能的NIO通信框架,在很多RPC框架中都有它的身影。我们也采用它当做通信服务器。说到这,我们先看个配置文件,重点有两个,zookeeper的注册地址和Netty通信服务器的地址。
TOMCAT端口 server.port=8001 #zookeeper注册地址 registry.address=192.168.245.131:2181,192.168.245.131:2182,192.168.245.131:2183 #RPC服务提供者地址 rpc.server.address=192.168.197.1:18868 复制代码
为了方便管理,我们把它也注册成Bean,同时实现ApplicationContextAware接口,把上面@RpcService注解的服务类捞出来,缓存起来,供消费者调用。同时,作为服务器,还要对客户端的链路进行心跳检测,超过60秒未读写数据,关闭此连接。
package com.viewscenes.netsupervisor.netty.server; @Component public class NettyServer implements ApplicationContextAware,InitializingBean{ private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1); private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4); private Map<String, Object> serviceMap = new HashMap<>(); @Value("${rpc.server.address}") private String serverAddress; @Autowired ServiceRegistry registry; public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class); for(Object serviceBean:beans.values()){ Class<?> clazz = serviceBean.getClass(); Class<?>[] interfaces = clazz.getInterfaces(); for (Class<?> inter : interfaces){ String interfaceName = inter.getName(); logger.info("加载服务类: {}", interfaceName); serviceMap.put(interfaceName, serviceBean); } } logger.info("已加载全部服务接口:{}", serviceMap); } public void afterPropertiesSet() throws Exception { start(); } public void start(){ final NettyServerHandler handler = new NettyServerHandler(serviceMap); new Thread(() -> { try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup). channel(NioServerSocketChannel.class). option(ChannelOption.SO_BACKLOG,1024). childOption(ChannelOption.SO_KEEPALIVE,true). childOption(ChannelOption.TCP_NODELAY,true). childHandler(new ChannelInitializer<SocketChannel>() { //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件 protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JSONEncoder()); pipeline.addLast(new JSONDecoder()); pipeline.addLast(handler); } }); String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); ChannelFuture cf = bootstrap.bind(host,port).sync(); logger.info("RPC 服务器启动.监听端口:"+port); registry.register(serverAddress); //等待服务端监听端口关闭 cf.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }).start(); } } 复制代码
上面的代码就把Netty服务器启动了,在处理器中的构造函数中,我们先把服务Bean的Map传进来,所有的处理要基于这个Map才能找到对应的实现类。在channelRead中,获取请求方法的信息,然后通过反射调用方法获取返回值。
package com.viewscenes.netsupervisor.netty.server; @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private final Map<String, Object> serviceMap; public NettyServerHandler(Map<String, Object> serviceMap) { this.serviceMap = serviceMap; } public void channelActive(ChannelHandlerContext ctx) { logger.info("客户端连接成功!"+ctx.channel().remoteAddress()); } public void channelInactive(ChannelHandlerContext ctx) { logger.info("客户端断开连接!{}",ctx.channel().remoteAddress()); ctx.channel().close(); } public void channelRead(ChannelHandlerContext ctx, Object msg) { Request request = JSON.parseObject(msg.toString(),Request.class); if ("heartBeat".equals(request.getMethodName())) { logger.info("客户端心跳信息..."+ctx.channel().remoteAddress()); }else{ logger.info("RPC客户端请求接口:"+request.getClassName()+" 方法名:"+request.getMethodName()); Response response = new Response(); response.setRequestId(request.getId()); try { Object result = this.handler(request); response.setData(result); } catch (Throwable e) { e.printStackTrace(); response.setCode(1); response.setError_msg(e.toString()); logger.error("RPC Server handle request error",e); } ctx.writeAndFlush(response); } } /** * 通过反射,执行本地方法 * @param request * @return * @throws Throwable */ private Object handler(Request request) throws Throwable{ String className = request.getClassName(); Object serviceBean = serviceMap.get(className); if (serviceBean!=null){ Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); return method.invoke(serviceBean, getParameters(parameterTypes,parameters)); }else{ throw new Exception("未找到服务接口,请检查配置!:"+className+"#"+request.getMethodName()); } } /** * 获取参数列表 * @param parameterTypes * @param parameters * @return */ private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){ if (parameters==null || parameters.length==0){ return parameters; }else{ Object[] new_parameters = new Object[parameters.length]; for(int i=0;i<parameters.length;i++){ new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]); } return new_parameters; } } public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.ALL_IDLE){ logger.info("客户端已超过60秒未读写数据,关闭连接.{}",ctx.channel().remoteAddress()); ctx.channel().close(); } }else{ super.userEventTriggered(ctx,evt); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.info(cause.getMessage()); ctx.close(); } } 复制代码
我们启动了Netty通信服务器,并且把服务实现类加载到缓存,等待请求时调用。这一步,我们要进行服务注册。为了简单化处理,我们只注册通信服务器的监听地址即可。 在上面代码中,bind之后我们执行了 registry.register(serverAddress);
它的作用就是,将Netty监听的IP端口注册到zookeeper。
package com.viewscenes.netsupervisor.registry; @Component public class ServiceRegistry { Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${registry.address}") private String registryAddress; private static final String ZK_REGISTRY_PATH = "/rpc"; public void register(String data) { if (data != null) { ZkClient client = connectServer(); if (client != null) { AddRootNode(client); createNode(client, data); } } } //连接zookeeper private ZkClient connectServer() { ZkClient client = new ZkClient(registryAddress,20000,20000); return client; } //创建根目录/rpc private void AddRootNode(ZkClient client){ boolean exists = client.exists(ZK_REGISTRY_PATH); if (!exists){ client.createPersistent(ZK_REGISTRY_PATH); logger.info("创建zookeeper主节点 {}",ZK_REGISTRY_PATH); } } //在/rpc根目录下,创建临时顺序子节点 private void createNode(ZkClient client, String data) { String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); logger.info("创建zookeeper数据节点 ({} => {})", path, data); } } 复制代码
有一点需要注意,子节点必须是临时节点。这样,生产者端停掉之后,才能通知到消费者,把此服务从服务列表中剔除。到此为止,生产者端已经完成。我们看一下它的启动日志:
加载服务类: com.viewscenes.netsupervisor.service.InfoUserService 已加载全部服务接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b} Initializing ExecutorService 'applicationTaskExecutor' Tomcat started on port(s): 8001 (http) with context path '' Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1) RPC 服务器启动.监听端口:18868 Starting ZkClient event thread. Socket connection established to 192.168.245.131/192.168.245.131:2183, initiating session Session establishment complete on server 192.168.245.131/192.168.245.131:2183, sessionid = 0x367835b48970010, negotiated timeout = 4000 zookeeper state changed (SyncConnected) 创建zookeeper主节点 /rpc 创建zookeeper数据节点 (/rpc/provider0000000000 => 192.168.197.1:28868) 复制代码
首先,我们需要把生产者端的服务接口API,即InfoUserService。以相同的目录放到消费者端。路径不同,调用会找不到的哦。
RPC的目标其中有一条,《程序员无需额外地为这个交互作用编程。》所以,我们在调用的时候,就像调用本地方法一样。就像下面这样:
@Controller public class IndexController { @Autowired InfoUserService userService; @RequestMapping("getById") @ResponseBody public InfoUser getById(String id){ logger.info("根据ID查询用户信息:{}",id); return userService.getInfoUserById(id); } } 复制代码
那么,问题来了。消费者端并没有此接口的实现,怎么调用到的呢?这里,首先就是代理。笔者这里用的是Spring的工厂Bean机制创建的代理对象,涉及的代码较多,就不在文章中体现了,如果有不懂的同学,请想象一下,MyBatis中的Mapper接口怎么被调用的。可以参考笔者文章: Mybatis源码分析(四)mapper接口方法是怎样被调用到的
总之,在调用userService方法的时候,会调用到代理对象的invoke方法。在这里,封装请求信息,然后调用Netty的客户端方法发送消息。然后根据方法返回值类型,转成相应的对象返回。
package com.viewscenes.netsupervisor.configurer.rpc; @Component public class RpcFactory<T> implements InvocationHandler { @Autowired NettyClient client; Logger logger = LoggerFactory.getLogger(this.getClass()); public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Request request = new Request(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setParameterTypes(method.getParameterTypes()); request.setId(IdUtil.getId()); Object result = client.send(request); Class<?> returnType = method.getReturnType(); Response response = JSON.parseObject(result.toString(), Response.class); if (response.getCode()==1){ throw new Exception(response.getError_msg()); } if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){ return response.getData(); }else if (Collection.class.isAssignableFrom(returnType)){ return JSONArray.parseArray(response.getData().toString(),Object.class); }else if(Map.class.isAssignableFrom(returnType)){ return JSON.parseObject(response.getData().toString(),Map.class); }else{ Object data = response.getData(); return JSONObject.parseObject(data.toString(), returnType); } } } 复制代码
在生产者端,我们把服务IP端口都注册到zookeeper中,所以这里,我们要去拿到服务地址,然后通过Netty连接。重要的是,还要对根目录进行监听子节点变化,这样随着生产者的上线和下线,消费者端可以及时感知。
package com.viewscenes.netsupervisor.connection; @Component public class ServiceDiscovery { @Value("${registry.address}") private String registryAddress; @Autowired ConnectManage connectManage; // 服务地址列表 private volatile List<String> addressList = new ArrayList<>(); private static final String ZK_REGISTRY_PATH = "/rpc"; private ZkClient client; Logger logger = LoggerFactory.getLogger(this.getClass()); @PostConstruct public void init(){ client = connectServer(); if (client != null) { watchNode(client); } } //连接zookeeper private ZkClient connectServer() { ZkClient client = new ZkClient(registryAddress,30000,30000); return client; } //监听子节点数据变化 private void watchNode(final ZkClient client) { List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> { logger.info("监听到子节点数据变化{}",JSONObject.toJSONString(nodes)); addressList.clear(); getNodeData(nodes); updateConnectedServer(); }); getNodeData(nodeList); logger.info("已发现服务列表...{}", JSONObject.toJSONString(addressList)); updateConnectedServer(); } //连接生产者端服务 private void updateConnectedServer(){ connectManage.updateConnectServer(addressList); } private void getNodeData(List<String> nodes){ logger.info("/rpc子节点数据为:{}", JSONObject.toJSONString(nodes)); for(String node:nodes){ String address = client.readData(ZK_REGISTRY_PATH+"/"+node); addressList.add(address); } } } 复制代码
其中, connectManage.updateConnectServer(addressList);
就是根据服务地址,去连接生产者端的Netty服务。然后创建一个Channel列表,在发送消息的时候,从中选取一个Channel和生产者端进行通信。
Netty客户端有两个方法比较重要,一个是根据IP端口连接服务器,返回Channel,加入到连接管理器;一个是用Channel发送请求数据。同时,作为客户端,空闲的时候还要往服务端发送心跳信息。
package com.viewscenes.netsupervisor.netty.client; @Component public class NettyClient { Logger logger = LoggerFactory.getLogger(this.getClass()); private EventLoopGroup group = new NioEventLoopGroup(1); private Bootstrap bootstrap = new Bootstrap(); @Autowired NettyClientHandler clientHandler; @Autowired ConnectManage connectManage; public Object send(Request request) throws InterruptedException{ Channel channel = connectManage.chooseChannel(); if (channel!=null && channel.isActive()) { SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel); Object result = queue.take(); return JSONArray.toJSONString(result); }else{ Response res = new Response(); res.setCode(1); res.setError_msg("未正确连接到服务器.请检查相关配置信息!"); return JSONArray.toJSONString(res); } } public Channel doConnect(SocketAddress address) throws InterruptedException { ChannelFuture future = bootstrap.connect(address); Channel channel = future.sync().channel(); return channel; } ....其他方法略 } 复制代码
我们必须重点关注send方法,它是在代理对象invoke方法调用到的。首先从连接器中轮询选择一个Channel,然后发送数据。但是,Netty是异步操作,我们还要转为同步,就是说要等待生产者端返回数据才往下执行。笔者在这里用的是同步队列SynchronousQueue,它的take方法会阻塞在这里,直到里面有数据可读。然后在处理器中,拿到返回信息写到队列中,take方法返回。
package com.viewscenes.netsupervisor.netty.client; @Component @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Autowired NettyClient client; @Autowired ConnectManage connectManage; Logger logger = LoggerFactory.getLogger(this.getClass()); private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>(); public void channelActive(ChannelHandlerContext ctx) { logger.info("已连接到RPC服务器.{}",ctx.channel().remoteAddress()); } public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress(); logger.info("与RPC服务器断开连接."+address); ctx.channel().close(); connectManage.removeChannel(ctx.channel()); } public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception { Response response = JSON.parseObject(msg.toString(),Response.class); String requestId = response.getRequestId(); SynchronousQueue<Object> queue = queueMap.get(requestId); queue.put(response); queueMap.remove(requestId); } public SynchronousQueue<Object> sendRequest(Request request,Channel channel) { SynchronousQueue<Object> queue = new SynchronousQueue<>(); queueMap.put(request.getId(), queue); channel.writeAndFlush(request); return queue; } public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception { logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息..."); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.ALL_IDLE){ Request request = new Request(); request.setMethodName("heartBeat"); ctx.channel().writeAndFlush(request); } }else{ super.userEventTriggered(ctx,evt); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ logger.info("RPC通信服务器发生异常.{}",cause); ctx.channel().close(); } } 复制代码
至此,消费者端也基本完成。同样的,我们先看一下启动日志:
Waiting for keeper state SyncConnected Opening socket connection to server 192.168.139.129/192.168.139.129:2181. Will not attempt to authenticate using SASL (unknown error) Socket connection established to 192.168.139.129/192.168.139.129:2181, initiating session Session establishment complete on server 192.168.139.129/192.168.139.129:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000 zookeeper state changed (SyncConnected) /rpc子节点数据为:["provider0000000015"] 已发现服务列表...["192.168.100.74:18868"] 加入Channel到连接管理器./192.168.100.74:18868 已连接到RPC服务器./192.168.100.74:18868 Initializing ExecutorService 'applicationTaskExecutor' Tomcat started on port(s): 7002 (http) with context path '' Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569) 复制代码
我们以Controller里面的两个方法为例,先开启100个线程调用insertInfoUser方法,然后开启1000个线程调用查询方法getAllUser。
public class IndexController { Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired InfoUserService userService; @RequestMapping("insert") @ResponseBody public List<InfoUser> getUserList() throws InterruptedException { long start = System.currentTimeMillis(); int thread_count = 100; CountDownLatch countDownLatch = new CountDownLatch(thread_count); for (int i=0;i<thread_count;i++){ new Thread(() -> { InfoUser infoUser = new InfoUser(IdUtil.getId(),"Jeen","BeiJing"); List<InfoUser> users = userService.insertInfoUser(infoUser); logger.info("返回用户信息记录:{}", JSON.toJSONString(users)); countDownLatch.countDown(); }).start(); } countDownLatch.await(); long end = System.currentTimeMillis(); logger.info("线程数:{},执行时间:{}",thread_count,(end-start)); return null; } @RequestMapping("getAllUser") @ResponseBody public Map<String,InfoUser> getAllUser() throws InterruptedException { long start = System.currentTimeMillis(); int thread_count = 1000; CountDownLatch countDownLatch = new CountDownLatch(thread_count); for (int i=0;i<thread_count;i++){ new Thread(() -> { Map<String, InfoUser> allUser = userService.getAllUser(); logger.info("查询所有用户信息:{}",JSONObject.toJSONString(allUser)); countDownLatch.countDown(); }).start(); } countDownLatch.await(); long end = System.currentTimeMillis(); logger.info("线程数:{},执行时间:{}",thread_count,(end-start)); return null; } } 复制代码
结果如下:
本文简单介绍了RPC的整个流程,如果你正在学习RPC的相关知识,可以根据文中的例子,自己实现一遍。相信写完之后,你会对RPC会有更深一些的认识。
生产者端流程:
消费者端流程:
限于篇幅,本文代码并不完整,如有需要,访问: github.com/taoxun/simp… 或者添加笔者微信公众号:<清幽之地的博客>),获取完整项目。