“ 为了提升Linkis的高并发能力,Linkis基于微服务架构,在Feign的基础之上,实现了一套自己的底层RPC通信方案,用以提升微服务间的通信性能和并发能力。 ”
基于Feign的微服务之间HTTP接口调用,只能满足简单的A微服务实例根据简单的规则随机访问B微服务之中的某个服务实例,而这个B微服务实例如果想异步回传信息给调用方,是根本无法实现的。
同时,由于Feign只支持简单的服务选取规则,无法做到将请求转发给指定的微服务实例,无法做到将一个请求广播给接收方微服务的所有实例。
Linkis RPC是基于Spring Cloud + Feign实现的一套微服务间的异步请求和广播通信服务,可以独立于Linkis而使用。
01
—
简介
Linkis RPC作为底层的通信方案,将提供SDK集成到有需要的微服务之中。
一个微服务既可以作为请求调用方,也可以作为请求接收方。
作为请求调用方时,将通过Sender请求目标接收方微服务的Receiver,作为请求接收方时,将提供Receiver用来处理请求接收方Sender发送过来的请求,以便完成同步响应或异步响应。
02
—
实现
基于请求调用方的Sender体系和请求接收方的Receiver体系,构成了Linkis RPC的全部架构。
Linkis RPC作为底层的通信层,发送端无需使用者写任何的实际代码。
使用者通过调用Linkis RPC提供的SDK,通过微服务名(Service Name)或指定微服务实例(微服务名+微服务实例的IP和端口),获取一个Sender发送器。
Sender提供的可使用方法,见如下伪代码:
abstract class Sender { Object ask(Object message); Object ask(Object message, Duration timeout); void send(Object message); void deliver(Object message); }
其中:
1. ask方法为同步请求响应方法,要求接收端必须同步返回响应; 2. send方法为同步请求方法,只负责同步将请求发送给接收端,不要求接收端给出答复; 3. deliver则为异步请求方法,只要发送端的进程不异常退出,在稍后会通过其它线程将请求发送给接收端。
使用者作为发送端,通过Sender发送器提供的请求方法,发送请求给接收端。
Sender发送器会将使用者的请求传递给拦截器。拦截器拦截请求,开始对请求做额外的功能性处理:
广播拦截器
广播拦截器只对需要进行广播的请求生效。
广播拦截器会提供特殊的广播接口,如果本次请求实现了该广播接口,且该请求不是正在广播中,广播拦截器则认为本次请求需要进行广播,这时会触发广播操作。
重试拦截器
重试拦截器会对接下来的所有步骤提供重试功能。
如果接收端要求重试,或者发送请求时出现了ConnectException(连接异常),或者使用者指定某些异常需要重试,这时重试拦截器会自动进行重试。
缓存拦截器
缓存拦截器是针对一些响应内容不大可能经常变动的同步请求而设定的。
缓存拦截器也会提供特殊的缓存接口,如果本次请求实现了缓存接口,会首先在缓存拦截器中寻找缓存,不存在缓存才会继续请求,并在拿到响应后,先将响应缓存起来,再将响应返回。
默认拦截器
默认拦截器用于调用接下来的处理步骤。
自定义拦截器
使用者也可以自己实现自定义拦截器,用于实现一些特定的功能。
请求编码器会先将用户请求的数据(实体Bean)转换成序列化的JSON字符串,然后传递给Feign客户端生成器。
Feign客户端生成器,生成可访问接收端Restful请求接收器的Feign客户端。
生成的Feign客户端,会调用Eureka客户端,获取所有微服务列表,通过服务选择器,如果使用者指定微服务名,则通过Feign的负载均衡策略,选择一个合适的接收方微服务实例进行请求转发,否则服务选择器会重写Spring Cloud Feign的FeignLoadBalancer(Feign负载均衡器),在创建LoadBalancerCommand时,请求使用者指定的微服务实例(微服务名+微服务实例address)。
调用Feign客户端,开始请求接收端的Restful请求接收器。
接收端需要使用者实现Receiver接口,用于处理真正的业务逻辑。
Restful请求接收器作为Linkis RPC内嵌的HTTP请求Web Service服务,负责接收发送端的请求
Restful请求接收器接收到请求后,先调用请求解码器对请求进行解码,解析出实际的请求信息和发送端微服务信息(微服务名+微服务实例的IP和端口),如果解析失败,会直接响应解析请求失败。
将解析后的请求信息和发送端微服务信息放入请求消息队列;
请求消费器会消费请求消息队列里,已经解码的发送端请求。
通过调用Receiver管理器获取一个合适的Receiver;同时通过发送端微服务信息,使用Sender生成器生成一个指向发送端的Sender。
然后请求消费器将实际的请求信息和生成的发送端Sender,传给Receiver进行处理。
Receiver作为用户请求的实际处理单元,要求使用者必须实现Receiver接口,完成对调用端请求的实际处理逻辑。
Receiver的伪代码如下:
public interface Receiver { void receive(Object message, Sender sender); Object receiveAndReply(Object message, Sender sender); Object receiveAndReply(Object message, Duration duration, Sender sender); }
Receiver提供了处理同步请求和异步请求的方法。
如果本次请求是异步请求,则调用Receiver的receive方法,由上层业务决定是否需要通过发送端的Sender回传响应。
如果本次请求是同步请求,则调用 Receiver的receiveAndReply方法,将返回值作为响应结果,回传发送端。
03
—
Linkis RPC是两个微服务实例之间的相互访问,所以这里需要启动两个微服务,一个用于接收请求的TestReceiver微服务,一个用于发送请求的TestSender微服务。
引入maven依赖
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-cloudRPC</artifactId>
<version>0.9.0</version>
</dependency>
配置application.yml
在resources下新建application.yml文件,以下为所有的配置信息:
请注意: 其中Eureka的URL地址必须与实际Eureka的首页地址相同。
server:
port: 9001 #该微服务提供服务的端口(必须)
spring:
application:
name: TestReceiver #微服务名,用于做微服务高可用(必须)
eureka:
client:
serviceUrl:
defaultZone: http://localhost:20303 #Eureka的URL地址
registry-fetch-interval-seconds: 5
#(必须)
management:
endpoints:
web:
exposure:
include: refresh,info
logging:
config: classpath:log4j2.xml
配置linkis.properties
# 是否开启测试模式
#wds.linkis.test.mode=true
# 指定restful的包路径
wds.linkis.server.restful.scan.packages=
# 指定restful接口的版本
wds.linkis.server.version=v1
实现Receiver接口
请注意: Receiver实现类必须以注解@Component或@Bean的方式,声明为一个spring bean。
@Component
public class TestReceiver implements Receiver {
/**
* 这是一个异步请求。请求方只是发送信息过来,不要求给任何答复。
* 如果希望给请求方一个答复,可以使用sender向请求方发送一个结果。
* 请求方能否收到sender发送的结果,取决于请求方是否也实现了Receiver接口。
* @param message 是Sender端传过来的信息对象,可以是实现了Protocol接口的对象,
* 也可以是一个Java类。
* @param sender 指向请求方的sender,凭此sender可以回传信息给请求方
*/
public void receive(Object message, Sender sender) {
System.out.println("received: " + message);
}
/**
* 这是一个同步请求。请求方发送请求,并要求立马返回一个处理结果。
* @param message 是Sender端传过来的信息对象,可以是实现了Protocol接口的对象,
* 也可以是一个Java类。
* @param sender 指向请求方的sender,凭此sender可以回传信息给请求方
*/
public Object receiveAndReply(Object message, Sender sender) {
System.out.println("received: " + message);
return "received: " + message;
}
public Object receiveAndReply(Object message, Duration duration, Sender sender) {
return receiveAndReply(message);
}
}
启动TestReceiver
至此,TestReceiver模块已成功启动,等待TestSender发送请求。
TestSender引入maven依赖、配置application.yml和linkis.properties的方式与TestReceiver大体一致,唯一的区别是application.yml的微服务名和端口需做修改,如下:
server:
port: 9003 #该微服务提供服务的端口(必须)
spring:
application:
name: TestSender #微服务名,用于做微服务高可用(必须)
public class TestSender {
public static void main(String[] args) {
Sender sender = Sender.getSender("TestReceiver"); //指定需要请求的微服务
sender.send("I ask a Async request!"); //向TestReceiver发送一个异步请求
Object response = sender.ask("I ask a sync request!");
System.out.println("response: " + response);
}
}
启动TestSender,即可看到TestReceiver接收到了TestSender的两个请求,且TestSender拿到了TestReceiver的第二个同步返回结果。
04
—
Linkis作为计算中间件,为了提升微服务间的通信能力和并发能力,做了很多的尝试和努力。
Linkis RPC作为Linkis底层的核心模块之一,为实现Linkis的全异步访问和调用,重构了一套全新的通信机制。
欢迎大家试用和使用Linkis!