13.jpg
具体参考example包下的案例
新建一个普通的SpringBoot项目ServerA, 然后导入服务注册依赖
<!-- 服务注册 --> <dependency> <groupId>com.burukeyou.BoomRpc</groupId> <artifactId>BoomRpc.register</artifactId> <version>1.0-SNAPSHOT</version> </dependency> 复制代码
然后编写application.properties文件,配置服务发布的信息
#发布服务的端口 server.port=8090 # 发布服务的应用名 spring.application.name=ServerA # 服务注册中心的地址,即Zookeeper的连接地址 boomRpc.register.address=192.168.1.19:2181 复制代码
在SpringBoot程序入口类添加@EnableServerRegister注解开启服务注册
@EnableServerRegister @SpringBootApplication public class ServerA_Application { public static void main(String[] args) { SpringApplication.run(ServerA_Application.class,args); } } 复制代码
编写需要公开被调用的服务并用@BoomService注解标注, 并填写服务名与接口ProductService保持一致
public interface ProductService { List<String> getAllProductByUserId(String id); void buyOne(Integer productId); } @BoomService("ProductService") public class ProductServiceImpl implements ProductService { @Override public List<String> getAllProductByUserId(String id) { return Arrays.asList("苹果","西瓜","饮料"); } @Override public void buyOne(Integer productId) { System.out.println("购买商品:" + productId); } } 复制代码
连续启动3个此项目, 依次修改application.properties的server.port为8090,8091,8092 启动项目,服务发布成功,在Zookeer可观察到, 服务应用ServerA下有三个服务器节点提供者
新建一个普通SpringBoot项目Client, 导入依赖
<!-- 服务调用 --> <dependency> <groupId>com.burukeyou.BoomRpc</groupId> <artifactId>BoomRpc.rpc</artifactId> <version>1.0-SNAPSHOT</version> </dependency> 复制代码
编写application.properties配置文件
server.port=8080 spring.application.name=client # 配置负载均衡策略: 轮训 boomRpc.rpc.loadblacne=roundRobin # 服务注册中心地址 boomRpc.register.address=192.168.1.19:2181 复制代码
复制需要远端调用的服务接口,比如ServerA项目中编写的ProductService接口类, 并用
@BoomRpc(name = "ServerA",callback = ProductServiceCallback.class)// public interface ProductService { List<String> getAllProductByUserId(String id); void buyOne(Integer productId); } // 容错处理类 public class ProductServiceCallback extends Callback implements ProductService { @Override public List<String> getAllProductByUserId(String id) { Throwable throwable = getThrowable(); // 获得失败异常信息 System.err.println("getAllProductByUserId服务调用失败: "+throwable.getMessage()); return Arrays.asList("获取商品失败"); } @Override public void buyOne(Integer productId) { Throwable throwable = getThrowable(); System.err.println("buyOne服务调用失败: "+throwable.getMessage()); } } 复制代码
在Cleint应用程序启动类添加开启服务调用功能注解@EnableBoomRpc, 而provider属性填写需要远端调用的接口的所在的包的位置 (即之前编写的ProductService所在包的位置)
@EnableBoomRpc(provider = {"burukeyou.client.rpc"}) @SpringBootApplication public class Client01Application { public static void main(String[] args) { SpringApplication.run(Client01Application.class,args); } } 复制代码
编写Controller测试远程调用效果
@RestController public class TestController { @Autowired private ProductService productService; @RequestMapping("/b") public void testProductServiceBuyOne(){ productService.buyOne(47289384); } @RequestMapping("/c") public List<String> testProductServiceGetAll(){ return productService.getAllProductByUserId("5324534"); } } 复制代码
启动Client程序,并调用接口http://localhost:8080/c ,发现与期待结果一致远程调用了ServerA的实现类
此次请求被负载均衡到了 ServerA的 8090节点
连续发送三个 http://localhost:8080/c 请求, 由于使用轮训策略, 服务器节点依次被调用
此时若再启动一个ServerA项目端口为8093, 测Client应用能动态感知到更新本地服务提供者缓存,保证数据一致性.
这时再发请求发现请求被负载到了新的服务器节点
同里把8093节点停掉也能动态感知到, 之后如何请求也不会被负载到8093上
如果把所有服务提供者Server停掉, 则Client远程调用将会失败,但配置了降级策略会回调到callback类进行处理, 如果不配置callbakc属性默认会抛出异常
再请求http://localhost:8080/c接口返回,发现代码输出与ProductServiceCallback类被回调了
为了模拟请求超时,把ServerRequesthandler类的睡眠代码 Thread.sleep(30000);解开注释
之后再次发送http://localhost:8080/c请求, 会请求超时被降级处理类回调
每次请求发送的超时时间默认是2秒,默认会重试3次发送, 在第4次发现重试次数用完就会抛出异常被降级处理类ProductServiceCallback处理回调显示
抛开所有不谈,只要能远程调用其他服务上的方法就算是实现了RPC,再说白就是服务于服务之间能进行通讯,所以只能我们之间能进行通讯我就能进行远程调用
private Object handleRequest(RpcRequest request) { //获得要调用哪个类 String className = request.getClassName(); className = className.substring(className.lastIndexOf(".")+1); // 获得要调用的这个类的哪个方法 String methodName = request.getMethodName(); // 获得要调用的这个类的这个方法的参数类型 Class<?>[] parameterTypes = request.getParameterTypes(); // 获得要调用的这个类的这个方法的参数类型的具体值 Object[] parameters = request.getParameters(); // 获得这个类的具体实现类的class对象 Object obj = ServerRegisterBoot.impClassMap.get(className); Class<?> clazz = obj.getClass(); // 根据这个类的具体实现类的class对象获取到方法对象 Method method = clazz.getMethod(methodName, parameterTypes); // 执行方法对象的到结果 return method.invoke(obj, parameters); } 复制代码
另一个关键就是如何像调用本地方法一样调用远程方法, 不可能说你每次要远程调用就重新写一大堆的代码,什么建立通讯连接,发送请求数据, 处理请求数据,把请求数据返回吧
public interface ProductService { List<String> get(String id); } @Autowired private ProductService productService; // 2 productService.getAllProductByUserId(id); // 3 复制代码
动态代理的远程调用核心实现如下:
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 1 - 封装请求对象,用于发送到另一段 //就是要告诉另一端你要调用哪个类的哪个方法 RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setParameters(args); // 2- 获得服务serverName的服务提供者列表 List<String> providerList = RpcCacheHolder.SERVER_PROVIDERS.get(serverName); if (providerList == null || providerList.size() < 1 ){ // 另一段压根没提供服务的机器存在,肯定无法远程调用直接降级处理 return Callbacker.Builder(rpcRequest) .IfNotCallback(() -> {throw new RuntimeException(serverName+"服务不存在,调用失败");}) .orElseSet(new RuntimeException(serverName+"服务不存在,调用失败")); } // 4. 负载均衡,具体要把请求发送到哪个机器处理 LoadBalanceContext loadBalanceContext = RpcCacheHolder.APPLICATION_CONTEXT.getBean(LoadBalanceContext.class); String serverIp = loadBalanceContext.executeLoadBalance(providerList); System.out.println("负载均衡: 调用服务" + serverName +"的" + serverIp + " 服务器节点"); // String[] host = serverIp.split(":"); RpcClient rpcClient = new RpcClient(host[0].trim(), Integer.parseInt(host[1].trim()),serverName); RpcResponse rpcResponse = null; try { // 5 - 最后把请求消息通过Netty发送到另一端即可, 等待接受另一端的响应 rpcResponse = rpcClient.sendRequest(rpcRequest); } catch (Exception e) { // 如果因为某些原因发送失败,直接降级处理 return Callbacker.Builder(rpcRequest).IfNotCallback(e::printStackTrace).orElseSet(e); } // 最后把实际远程调用的那个方法的返回值返回即可 return rpcResponse != null ? rpcResponse.getResult() : null; } 复制代码
public String register(String nodePath, String nodeData) { isConnenct(); String path = null; try { path = zkClient.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) //设置为临时节点 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //设置权限 .forPath(nodePath, nodeData.getBytes()); }catch (KeeperException.NodeExistsException e){ logger.error("NodeExistsException ----服务注册失败,该服务器节点 {} 已经注册,请修改",e.getPath()); e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return path; } 复制代码
Reflections reflections = new Reflections("@BoomRpc注解所在的包"); Set<Class<?>> rpcClazz = reflections.getTypesAnnotatedWith(BoomRpc.class, true); for (Class<?> e : rpcClazz) { BoomRpc annotation = e.getAnnotation(BoomRpc.class); String serverName = "".equals(annotation.name()) ? annotation.value() : annotation.name(); //把要订阅的服务serverName添加到Set集合即可 RpcCacheHolder.SUBSCRIBE_SERVICE.add(serverName); } 复制代码
然后就可以用curator的api去订阅这些服务.代码实现如下
public static CuratorFramework zkClient = null; private static ReentrantLock updateProviderLock = new ReentrantLock(); // 服务发现 public List<String> discover(String serverName){ isConnenct(); List<String> serverList = null; try { PathChildrenCache childrenCache = new PathChildrenCache(zkClient, "/" + serverName,true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); // 订阅服务 addListener(childrenCache,serverName); } catch (Exception e) { e.printStackTrace(); } return serverList; } private void addListener(PathChildrenCache childrenCache,String serverName){ childrenCache.getListenable().addListener((curatorFramework, event) -> { // 创建子节点 if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ String path = event.getData().getPath(); String host = path.substring(path.lastIndexOf("/") + 1, path.length()); System.out.println("服务器上线:" + path); try { // 更新本地服务提供者缓存 //SERVER_PROVIDERS就是一个 Map<String, List<String>>集合 updateProviderLock.lock(); List<String> list = RpcCacheHolder.SERVER_PROVIDERS.getOrDefault(serverName,new ArrayList<>()); list.add(host); RpcCacheHolder.SERVER_PROVIDERS.put(serverName,list); } finally { updateProviderLock.unlock(); } } // 删除子节点 else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ String path = event.getData().getPath(); String host = path.substring(path.lastIndexOf("/") + 1, path.length()); System.out.println("服务器下线:" + event.getData().getPath()); try { updateProviderLock.lock(); List<String> list = RpcCacheHolder.SERVER_PROVIDERS.get(serverName); list.remove(host); RpcCacheHolder.SERVER_PROVIDERS.put(serverName,list); } finally { updateProviderLock.unlock(); } } }); } 复制代码
public class RpcRequest extends RpcProtocol { private String requestId;// 请求id private String className; //调用类名 private String methodName; //调用方法名 private Class<?>[] parameterTypes; // 方法参数类型 private Object[] parameters;//方法参数 } 复制代码
public class RpcResponse extends RpcProtocol { private String requestId; //对应的请求id, 这个响应对象对应哪个请求 private Exception exception; //请求异常信息 private Object result; // 实际远程方法调用返回的数据 复制代码
一般都可以在配置文件中配置使用哪种负载策略,然后用Spring读取动态生成对应的负载均衡实现类然后设置到LoadBalanceContext上下文种, 最后再把LoadBalanceContext注入到Spring容器中即可. 之后通过LoadBalanceContext执行负载策略.代码实现
@Configuration public class RpcBeanConfiguration { private final BoomRpcProperties properties; public RpcBeanConfiguration(BoomRpcProperties properties) { this.properties = properties; } // Spring启动时执行这段代码,把LoadBalanceContext注入到容器中 @Bean public LoadBalanceContext loadBalanceContext(){ LoadBalanceContext loadBalanceContext = new LoadBalanceContext(); if ("roundRobin".equalsIgnoreCase(properties.getLoadblacne().trim())){ loadBalanceContext.setLoadBalanceStrategy(new RoundRobinStrategy()); }else if("random".equalsIgnoreCase(properties.getLoadblacne().trim())){ loadBalanceContext.setLoadBalanceStrategy(new RandomStrategy()); }else if("hash".equalsIgnoreCase(properties.getLoadblacne().trim())){ // todo }else { loadBalanceContext.setLoadBalanceStrategy(new RandomStrategy()); } return loadBalanceContext; } } 复制代码
一般是否需要处理回调只有一个逻辑即是否配置了callback类,没有则直接抛出异常,伪代码如下:
//假设已经拿到@BoomRpc注解的callback属性的值 if(callback != null) { // 调用callback的类对象进行回调处理 }else { // 直接抛出异常 } 复制代码
为了方便调用写了一个链式调用链Callbacker处理: 调用方式如下:
RpcResponse rpcResponse = null; try { // 这是远程服务调用返回的响应对象 rpcResponse = rpcClient.sendRequest(rpcRequest); //根据响应对象是否包含异常信息判断是否远程调用异常 if (rpcResponse != null && rpcResponse.getException() != null){ Exception e = rpcResponse.getException(); //如果存在异常则交给容错处理类Callbacker进行处理 return Callbacker.Builder(rpcRequest).IfNotCallback(e::printStackTrace).orElseSet(e); } } catch (Exception e) { // 捕获异常,容错处理 return Callbacker.Builder(rpcRequest).IfNotCallback(e::printStackTrace).orElseSet(e); } 复制代码
具体容错处理类Callbacker实现:
public class Callbacker { private RpcRequest rpcRequest; private Class<?> callBackClass; private Class<?> callback; public static Callbacker Builder(RpcRequest rpcRequest){ return new Callbacker(rpcRequest); } // 创建Callbacker的同时拿到 BoomRpc注解配置的callback类 private Callbacker(RpcRequest rpcRequest) { this.rpcRequest = rpcRequest; try { System.out.println(rpcRequest); callBackClass = Class.forName(rpcRequest.getClassName()); callback = callBackClass.getAnnotation(BoomRpc.class).callback(); } catch (Exception e) { //e.printStackTrace(); System.out.println("1"); } } // 如果不需要回调处理直接执行自定义的处理方法Process,一般都会直接传抛出异常 public Callbacker IfNotCallback(Process process){ if (!shouldCallback()) process.doSomething(); return this; } // 如果要进行回调处理, 直接把异常信息传入去处理即可 public Object orElseSet(Throwable throwable) throws Exception { if (shouldCallback()){ // 创建callback的实例对象 Object obj = callback.newInstance(); //从callback上获得此次请求的方法的方法对象 Method method = callback.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes()); // 由于callback类都需要继承Callback抽象类拿到异常信息,这里就可以动态注入异常信息 Method setThrowable = callback.getMethod("setThrowable", Throwable.class); setThrowable.invoke(obj,throwable); // 之后执行此次请求的方法的方法对象即可 return method.invoke(obj,rpcRequest.getParameters()); }else return null; } // 判断callback是否为默认值void.class来进行回调处理 private boolean shouldCallback(){ return callback != void.class; } public interface Process { void doSomething(); } } 复制代码