摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-mode/ 「芋道源码」欢迎转载,保留摘要,谢谢!
关注 微信公众号:【芋道源码】 有福利:
本文主要分享 Hystrix 执行命令方法 。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:
方法 | ||
---|---|---|
#execute() |
同步 调用,返回 直接 结果 | |
#queue() |
异步 调用,返回 java.util.concurrent.Future |
|
#observe() |
异步 调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
|
#toObservable() |
未调用 ,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
#testToObservable()
查看笔者补充的示例。
// AbstractCommand.java abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R>{ // ... 省略无关属性与方法 public Observable<R> toObservable(){ return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call(){ // .... } } } public Observable<R> observe(){ // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject<R> subject = ReplaySubject.create(); // eagerly kick off subscription final Subscription sourceSubscription = toObservable().subscribe(subject); // return the subject that can be subscribed to later while the execution has already started return subject.doOnUnsubscribe(new Action0() { @Override public void call(){ sourceSubscription.unsubscribe(); } }); } } // HystrixCommand.java public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R>{ // ... 省略无关属性与方法 public Future<R> queue(){ final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { // ... 包装 delegate } // ... return f; } public R execute(){ try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } protected abstract R run() throws Exception; }
#toObservable()
方法 : 未 做订阅,返回干净的 Observable 。 这就是为什么上文说“未调用” 。 #observe()
方法 :调用 #toObservable()
方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject
发起订阅 。
#queue()
方法 :调用 #toObservable()
方法的基础上,调用:
Observable#toBlocking()
方法 :将 Observable 转换成 阻塞 的 rx.observables.BlockingObservable
。 BlockingObservable#toFuture()
方法 :返回可获得 #run()
抽象方法 执行结果的 Future 。
#run()
方法 :子类实现该方法,执行 正常的业务逻辑 。
#execute()
方法 :调用 #queue()
方法的基础上,调用 Future#get()
方法, 同步 返回 #run()
的执行结果。 整理四种调用方式如下:
FROM 《【翻译】Hystrix文档-实现原理》
本小节为 拓展内容 ,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable
的实现,所以你可以选择:
《RxJava 源码解析 —— BlockingObservable》
第一篇 Hystrix 正式的源码解析。
梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。
胖友,分享一波朋友圈可好!