hystrix是spring cloud的熔断降级组件,由netflix公司开源,通过命令模式结合rxjava框架实现,命令模式封装了用户具体业务,使用rxjava对命令的执行结果进行统计,根据统计结果按一定策略执行熔断降级,避免造成应用失败雪崩。
执行流程如下图:流程说明:
1.每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.
2:执行execute()/queue做同步或异步调用.
3:判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入后续步骤.
4:判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.
5:调用HystrixCommand的run方法.运行依赖逻辑
5a:依赖逻辑调用超时,进入步骤8.
6:判断逻辑是否调用成功
6a:返回成功调用结果
6b:调用出错,进入步骤8.
7:计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.
8:getFallback()降级逻辑.
以下四种情况将触发getFallback调用:
(1):run()方法抛出非HystrixBadRequestException异常。
(2):run()方法调用超时
(3):熔断器开启拦截调用
(4):线程池/队列/信号量是否跑满
8a:没有实现getFallback的Command将直接抛出异常
8b:fallback降级逻辑调用成功直接返回
8c:降级逻辑调用失败抛出异常
9:返回执行成功结果
官方文档有详细的使用示例:
https://github.com/Netflix/Hystrix/wiki/How-To-Use快递业务涉及很多外部对接,为了各个对接接口的隔离和失败的降级防止雪崩,所以引入了hystrix作为降级组件。 使用非常方便
1.引入依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> <version>1.4.5.RELEASE</version> </dependency>
2.启动了增加annotation @EnableCircuitBreaker
3.需要隔离降级的方法增加注解:
@HystrixCommand(groupKey= "yunda",commandKey="scanningOrder",threadPoolKey="scanningOrder-thread", threadPoolProperties = {@HystrixProperty(name="maximumSize",value = "20")}, commandProperties = {@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value = "3000")}, fallbackMethod="scanningOrderFallback", ignoreExceptions={BusinessException.class} )
spring cloud使用aop方式将包含@HystrixCommand注解的方法进行代理,包装了一层HystrixCommond并做了扩展。 该注解上可以配置hystrix各项参数
配置说明:
groupKey指定命令分组,同一个分组使用一个线程池。
commandKey指定命令名称。
threadPoolKey指定代码执行的具体线程名称。
commandProperties 可配置命令执行及指标统计相关参数:
execution.isolation.thread.timeoutInMilliseconds 该参数配置业务代码超时时间(默认1000ms),执行超过该时间会触发降级方法,没有降级方法将抛出HystrixRuntimeException
metrics.rollingStats.timeInMilliseconds参数可配置执行结果统计的滑动窗口时间 默认10000ms
metrics.rollingStats.numBuckets参数可配置滑动窗口包含多少段 默认10
circuitBreaker.requestVolumeThreshold参数可配置触发熔断的请求量阈值 默认20
circuitBreaker.errorThresholdPercentage可配置触发熔断的失败比率,默认50%
circuitBreaker.sleepWindowInMilliseconds可配置触发熔断到恢复的时间窗,默认5s
threadPoolProperties 可配置线程池相关参数,hystrix默认使用线程池进行业务隔离,核心线程数和最大线程数默认都是10个线程,并且使用SynchronizedQueue,即默认限制了最大并发数为10
fallbackMethod指定了降级的方法。
ignoreExceptions指定哪些异常是不需要降级的,比如我们需要给前端返回一个BusinessException,就不需要降级。
核心的配置上面都介绍了,更具体的可以看代码里的HystrixCommandProperties和HystrixThreadPoolProperties两个类。
使用过程中遇到一个问题: 当代码执行异常触发降级之后,降级的方法也是返回一个具体Exception,最终抛出的是对应的Exception 而当方法超时触发降级并且降级的方法也是返回一个具体Exception,最终抛出的却是HystrixRuntimeException 查看代码之后发现commond都是返回了HystrixRuntimeException,而实现aop的HystrixCommandAspect中对HystrixRuntimeException 做了处理,如果是命令执行失败类型(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION),将抛出具体异常 即降级中的Exception,失败类型不是命令执行失败类型(比如超时),将直接抛出HystrixRuntimeException,这样就不能返回我们需要返回的降级方法的异常了,只能通过修改具体实现来解决了。
hystrix熔断决策及dashboard展示都是依赖于指标的统计,基于不同的HystrixEventStream实现发射不同的事件流。
类图:HystrixCommandMetrics中的指标流如下:
//统计滚动窗口总请求数 失败请求数及失败比率用于熔断判断 private HealthCountsStream healthCountsStream; //统计滚动窗口时间内各分段时间的命令执行结果 private final RollingCommandEventCounterStream rollingCommandEventCounterStream; //汇总命令所有时间执行结果 private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream; //滚动窗口不同占比请求的命令耗时统计 private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream; //滚动窗口不同占比请求的用户代码耗时统计 private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream; //滚动窗口最大并发统计 private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;
以上的统计除了rollingCommandMaxConcurrencyStream是基于HystrixCommandStartStream,其他都是基于HystrixCommandCompletionStream,选择一个统计流看看具体实现,比如RollingCommandEventCounterStream。 类图:
看下基类的BucketedCounterStream的Observable实现
this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } });
inputEventStream是上面的HystrixCommandCompletionStream命令完成事件流,通过rxjava的window api按每秒打开一个窗口做处理,
rxjava window api:
https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Window.html 处理方法是:
this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } };
appendRawEventToBucket在HystrixCommandMetrics定义:
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() { @Override public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) { ExecutionResult.EventCounts eventCounts = execution.getEventCounts(); for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here default: initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType); break; } } return initialCountArray; } };
做的处理就是对命令的执行结果转换为了Long数组,不同执行结果次数存于不同下标。 而BucketedRollingCounterStream继承于BucketedCounterStream,实现的Observable如下:
this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer
也是window api发射numBuckets(默认10)之后新开窗口并且skip了1,即默认窗口时间10s分10段,每次达到10段新开窗口并skip最后1s的窗口,从而达到了滚动的效果。对每秒的window Observable做reduceWindowToSummary操作,实现如下:
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } };
使用rxjava 的scan api对每一项应用了reduceBucket操作,scan api定义:
https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Scan.html reduceBucket定义在HystrixCommandMetrics的bucketAggregator:
public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() { @Override public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) { for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) { cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()]; } break; default: cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()]; break; } } return cumulativeEvents; } };
实现了bucket里面的不同执行结果次数的相加,做到了滚动窗口内每个桶内各项执行结果的统计。
总体指标流程如下图:实现涉及到很多rxjava api 可看文档:
https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html 由于本人能力有限,如有不对之处,欢迎指正。