摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-result-cache/ 「芋道源码」欢迎转载,保留摘要,谢谢!
关注**微信公众号:【芋道源码】**有福利:
本文主要分享 Hystrix 执行命令的结果缓存 。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
Hystrix 执行命令整体流程如下图:
FROM 《【翻译】Hystrix文档-实现原理》「流程图」
#toObservable()
方法里,如果请求结果缓存这个特性被 启用 ,并且 缓存命中 ,则缓存的回应会立即通过一个 Observable 对象的形式返回;如果 缓存未命中 ,则返回【 订阅了执行命令的 Observable 】的 ReplySubject 对象缓存执行结果。
在官方提供的示例中,我们使用 CommandUsingRequestCache 进行调试 。
点击 《【翻译】Hystrix文档-实现原理》「请求缓存」 ,查看对 请求缓存 的好处分享,写的真的很赞。
本小节为 拓展内容 ,源码解析 RxJava ( 非 Hystrix ) 的 Observable#defer(...)
的方法实现。考虑到 Hystrix 大量使用,为了更好的理解,解析下源码。
《RxJava 源码解析 —— Observable#defer(...)》
AbstractCommand#toObservavle(...)
方法,代码如下 :
1: public Observable<R> toObservable(){ 2: final AbstractCommand<R> _cmd = this; 3: 4: //doOnCompleted handler already did all of the SUCCESS work 5: //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work 6: final Action0 terminateCommandCleanup = new Action0() {} // ... 省略 7: 8: //mark the command as CANCELLED and store the latency (in addition to standard cleanup) 9: final Action0 unsubscribeCommandCleanup = new Action0() {} // ... 省略 10: 11: final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { 12: @Override 13: public Observable<R> call(){ 14: if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { 15: return Observable.never(); 16: } 17: return applyHystrixSemantics(_cmd); 18: } 19: }; 20: 21: final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {} // ... 省略 22: 23: final Action0 fireOnCompletedHook = new Action0() {} // ... 省略 24: 25: return Observable.defer(new Func0<Observable<R>>() { 26: @Override 27: public Observable<R> call(){ 28: /* this is a stateful object so can only be used once */ 29: if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { 30: IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); 31: //TODO make a new error type for this 32: throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); 33: } 34: 35: // 命令开始时间戳 36: commandStartTimestamp = System.currentTimeMillis(); 37: 38: // TODO【2001】【打印日志】 39: if (properties.requestLogEnabled().get()) { 40: // log this command execution regardless of what happened 41: if (currentRequestLog != null) { 42: currentRequestLog.addExecutedCommand(_cmd); 43: } 44: } 45: 46: // 缓存开关、缓存KEY 47: final boolean requestCacheEnabled = isRequestCachingEnabled(); 48: final String cacheKey = getCacheKey(); 49: 50: // 优先从缓存中获取 51: /* try from cache first */ 52: if (requestCacheEnabled) { 53: HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); 54: if (fromCache != null) { 55: isResponseFromCache = true; // 标记 从缓存中结果 56: return handleRequestCacheHitAndEmitValues(fromCache, _cmd); 57: } 58: } 59: 60: // 获得 执行命令Observable 61: Observable<R> hystrixObservable = 62: Observable.defer(applyHystrixSemantics) 63: .map(wrapWithAllOnNextHooks); 64: 65: // 获得 缓存Observable 66: Observable<R> afterCache; 67: // put in cache 68: if (requestCacheEnabled && cacheKey != null) { 69: // wrap it for caching 70: HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); 71: // 并发若不存在 72: HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); 73: if (fromCache != null) { // 添加失败 74: // another thread beat us so we'll use the cached value instead 75: toCache.unsubscribe(); 76: isResponseFromCache = true; // 标记 从缓存中结果 77: return handleRequestCacheHitAndEmitValues(fromCache, _cmd); 78: } else { // 添加成功 79: // we just created an ObservableCommand so we cast and return it 80: afterCache = toCache.toObservable(); 81: } 82: } else { 83: afterCache = hystrixObservable; 84: } 85: 86: // 87: return afterCache 88: .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) 89: .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once 90: .doOnCompleted(fireOnCompletedHook); 91: } 92: }); 93: }
第 2 行 : _cmd
指向当前命令对象,用于下面实现 FuncX ,ActionX 内部类使用。
第 11 至 19 行 :当缓存特性 未开启 ,或者缓存 未命中 时,使用 applyHystrixSemantics
传入 Observable#defer(...)
方法,声明 执行命令 的 Observable。
第 25 行 :声明缓存 Observable 。Hystrix 执行命令的 Observable 声明关系如下:
第 29 至 33 行 : 一条命令只能执行一次 。
第 36 行 :记录命令 开始 时间戳。
第 38 至 44 行 :TODO【2001】【打印日志】
第 47 至 48 行 :缓存存开关、KEY 。
第 52 至 58 行 :如果请求结果缓存这个特性被 启用 ,并且 缓存命中 ,则缓存的回应会立即通过一个 Observable 对象的形式返回。
requestCache
缓存,在TODO 【2008】【请求缓存】详细解析。 #handleRequestCacheHitAndEmitValues(...)
方法,在 第 78 行 详细解析。 第 61 至 63 行 :获取 执行命令 的 Observable 。在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》 详细解析。
第 68 至 81 行 :当缓存特性 开启 ,并且缓存 未命中 时,创建【 订阅了执行命令的 Observable 】的 HystrixCommandResponseFromCache 。
requestCache
。哟, HystrixRequestCache#putIfAbsent(...)
方法, 多个线程 添加时,只有一个线程添加成功。 HystrixCommandResponseFromCache#unsubscribe()
方法,取消 HystrixCommandResponseFromCache 的订阅。这一步很关键,因为我们 不希望缓存不存在时,多个线程去执行命令,最好有且只有一个线程执行命令 。在「5. HystrixCachedObservable」详细解析。 HystrixCommandResponseFromCachetoObservable()
方法,获得缓存 Observable 。 第 82 至 84 行 :当缓存特性 未开启 ,使用执行命令 Observable 。
第 87 至 91 行 :在返回的 Observable 上,订阅一些清理的处理逻辑。对这几个方法有疑惑的同学,可以阅读 《给 Android 开发者的 RxJava 详解》「 3) Subscribe (订阅) 」 。
com.netflix.hystrix.HystrixCachedObservable
,缓存 Observable 。
HystrixCachedObservable 构造方法 ,代码如下 :
1: public class HystrixCachedObservable<R>{ 2: /** 3: * 订阅 4: */ 5: protected final Subscription originalSubscription; 6: /** 7: * 缓存 cachedObservable 8: */ 9: protected final Observable<R> cachedObservable; 10: /** 11: * TODO 【2006】【outstandingSubscriptions】 12: */ 13: private volatile int outstandingSubscriptions = 0; 14: //private AtomicInteger outstandingSubscriptions2 = new AtomicInteger(0); 15: 16: protected HystrixCachedObservable(final Observable<R> originalObservable){ 17: ReplaySubject<R> replaySubject = ReplaySubject.create(); 18: this.originalSubscription = originalObservable 19: .subscribe(replaySubject); 20: 21: this.cachedObservable = replaySubject 22: .doOnUnsubscribe(new Action0() { 23: @Override 24: public void call(){ 25: outstandingSubscriptions--; 26: if (outstandingSubscriptions == 0) { 27: originalSubscription.unsubscribe(); 28: } 29: } 30: }) 31: .doOnSubscribe(new Action0() { 32: @Override 33: public void call(){ 34: outstandingSubscriptions++; 35: } 36: }); 37: }
第 17 至 19 行 :实际上, HystrixCachedObservable 不是一个 Observable 的子类 ,而是对传入的 Observable 封装 :使用 ReplaySubject 向传入的 Observable 发起订阅,通过 ReplaySubject 能够 重放 执行结果,从而实现缓存的功效。这里有几个卡到笔者的并且很有趣的点,我们一一道来 :从上文中,我们可以看到,传入的 originalObservable
为 hystrixObservable
执行命令 Observable 。在 Hystrix 里,提供了两种执行命令的隔离方式 :线程池( THREAD
) 和信号量( SEMAPHORE
)。
当使用 THREAD
隔离时, #subscribe(replaySubject)
调用完成时, 实际命令并未开始执行 ,或者说,这是一个 异步 的执行命令的过程。那么, 会不会影响返回执行结果呢 ?答案当然是不会,BlockingObservable 在得到执行完成才会 结束阻塞 ,此时已经有执行结果。
当使用 SEMAPHORE
隔离时, #subscribe(replaySubject)
调用完成时, 实际命令已经执行完成 ,所以即使 AbstractCommand#toObservavle(...)
的第 75 行 :调用 HystrixCommandResponseFromCache#unsubscribe()
方法,也会浪费, 重复 执行命令。而对于 THREAD
隔离的情况,通过取消订阅的方式,只会执行 一次 命令。当然,如果“恶搞” THREAD
隔离的情况,增加 sleep
的调用如下,就能达到 重复 执行命令的效果。
第 21 至 36 行 :TODO 【2006】【outstandingSubscriptions】原子性没问题么?历史版本使用的是 AtomicInteger 。
HystrixCachedObservable 的其他方法,点击 链接 查看。
com.netflix.hystrix.HystrixCommandResponseFromCache
,是 HystrixCachedObservable 的子类。在父类的基础上,增加了对 AbstractCommand.executionResult
的关注。
HystrixCachedObservable#from(Observable, AbstractCommand)
方法,创建 HystrixCommandResponseFromCache 对象,点击 链接 查看。
HystrixCommandResponseFromCache#toObservableWithStateCopiedInto(...)
方法,点击 链接 查看。
completionLogicRun
属性,保证 #doOnError()
, #doOnCompleted()
, #doOnUnsubscribe()
方法有且只有一个方法执行具体逻辑。
#doOnError()
, #doOnCompleted()
执行时,调用 #commandCompleted()
方法,从缓存命令( HystrixCommandResponseFromCache.originalCommand
) 复制 executionResult
属性给当前命令( commandToCopyStateInto
) 。 #doOnUnsubscribe()
执行时,调用 #commandUnsubscribed()
方法,使用当前命令( commandToCopyStateInto
) 自己 的 executionResult
,不进行复制。 如鲠在喉的感觉,从周六开始磨了四天多,一直没写到一个比较舒服的状态。
先发现发,如果有不清晰的地方,烦请指出,谢谢。
胖友,分享一波朋友圈可好!