关注 微信公众号:【芋道源码】 有福利:
本文主要分享 HystrixGatewayFilterFactory 的代码实现 。
在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.2) 之 GatewayFilterFactory 过滤器工厂》 一文中,我们看到 Spring Cloud Gateway 提供了多种 GatewayFilterFactory 的实现,而 HystrixGatewayFilterFactory 也是其中的一种。
通过 HystrixGatewayFilterFactory ,可以创建 HystrixGatewayFilter ( 实际是内部匿名类,为了表述方便,下面继续这么称呼 ) 。
HystrixGatewayFilter 使用 Hystrix ,实现基于 Route 级别的熔断功能。
这里,笔者一本正经的推荐下自己分享的 《Hystrix 源码解析系列》 ,简直业界良心。
第一步,以 spring-cloud-gateway-sample
项目为基础,在 pom.xml
文件添加依赖库。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
第二步,在 application.yml
配置 一个 RouteDefinition 。
spring: cloud: gateway: routes: # ===================================== - id: default_path_to_httpbin uri: http://127.0.0.1:8081 order: 10000 predicates: - Path=/** filters: - Hystrix=myCommandName
- Hystrix=myCommandName
,配置 HystrixGatewayFilterFactory ,并以 myCommandName
为 Hystrix Command 名字 。 第三步,配置完成,启动 spring-cloud-gateway-sample
项目。
org.springframework.cloud.gateway.filter.factory.HystrixGatewayFilterFactory
,熔断网关过滤器 工厂 。代码如下 :
1: public class HystrixGatewayFilterFactory implements GatewayFilterFactory{ 2: 3: @Override 4: public List<String> argNames(){ 5: return Arrays.asList(NAME_KEY); 6: } 7: 8: @Override 9: public GatewayFilter apply(Tuple args){ 10: //TODO: if no name is supplied, generate one from command id (useful for default filter) 11: final String commandName = args.getString(NAME_KEY); 12: final HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(getClass().getSimpleName()); 13: final HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandName); 14: 15: final HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter 16: .withGroupKey(groupKey) 17: .andCommandKey(commandKey); 18: 19: return (exchange, chain) -> { 20: RouteHystrixCommand command = new RouteHystrixCommand(setter, exchange, chain); 21: 22: return Mono.create(s -> { 23: // 使用 Hystrix Command Observable 订阅 24: Subscription sub = command.toObservable().subscribe(s::success, s::error, s::success); 25: // Mono 取消时,取消 Hystrix Command Observable 的订阅,结束 Hystrix Command 的执行 26: s.onCancel(sub::unsubscribe); 27: }).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> { 28: if (throwable instanceof HystrixRuntimeException) { 29: HystrixRuntimeException e = (HystrixRuntimeException) throwable; 30: if (e.getFailureType() == TIMEOUT) { //TODO: optionally set status 31: setResponseStatus(exchange, HttpStatus.GATEWAY_TIMEOUT); 32: return exchange.getResponse().setComplete(); 33: } 34: } 35: return Mono.empty(); 36: }).then(); 37: }; 38: } 39: }
#argNames()
方法,定义了 Tuple 参数的 Key 为 name
。 #apply()
方法,创建 HystrixGatewayFilter 对象。 commandName = myCommandName
。 HystrixGatewayFilterFactory
。 commandName
。 第 20 行 :创建 RouteHystrixCommand 对象。代码如下 :
private class RouteHystrixCommand extends HystrixObservableCommand<Void>{ private final ServerWebExchange exchange; private final GatewayFilterChain chain; RouteHystrixCommand(Setter setter, ServerWebExchange exchange, GatewayFilterChain chain) { super(setter); this.exchange = exchange; this.chain = chain; } @Override protected Observable<Void> construct(){ return RxReactiveStreams.toObservable(this.chain.filter(this.exchange)); } }
第 22 至 26 行 :调用 Mono#create(Consumer<MonoSink<T>>)
方法,创建 Mono 对象。点击 传送门 查看该方法详细说明。因为 Hystrix 基于 RxJava ,而 GatewayFilter 基于 Reactor ( Mono 是其内部的一个类 ),通过这个方法,实现订阅的适配。 未来,会实现 HystrixMonoCommand 替换 HystrixObservableCommand ,从而统一订阅,去除适配代码 。
RouteHystrixCommand#toObservable()
方法,内部会调用 RouteHystrixCommand#construct()
方法,获得执行 this.chain.filter(this.exchange)
的 Observable 。2)订阅 Observable :成功或完成时,调用 Mono#success(Object)
方法,目前创建的 Mono 上没有相关的订阅; 异常时 ,调用 Mono#error(Object)
方法,目前创建的 Mono 上调用 Mongo#onErrorResume(Function<Throwable, Mono<Void>>))
方法,进行订阅。 exchange.getResponse().setComplete()
) 。 Mono.empty()
,最终返回客户端 200 状态码,内容为空 。 Mono#then()
方法, 参数为空 ,返回空 Mono ,不再向后发射数据。 Hystrix 配置参数,目前只能 全局 配置,例如说 Hystrix 执行超时时间,配置如下 :
hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 10000
当 Hystrix 熔断时,最终返回客户端 200 状态码,内容为空,此处建议该 HystrixGatewayFilter 的代码实现。
嘿嘿嘿,写完熔断,准备限流过滤器走起。鸡冻!
胖友,分享一波朋友圈可好!