 
 
关注 微信公众号:【芋道源码】 有福利:
本文主要分享 Hystrix 执行命令方法 。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:
| 方法 | ||
|---|---|---|
| #execute() | 同步 调用,返回 直接 结果 | |
| #queue() | 异步 调用,返回 java.util.concurrent.Future | |
| #observe() | 异步 调用,返回 rx.Observable。向 Observable 注册rx.Subscriber处理结果 | |
| #toObservable() | 未调用 ,返回 rx.Observable。向 Observable 注册rx.Subscriber处理结果 | 
#testToObservable()  查看笔者补充的示例。   
 
// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R>{
 
 // ... 省略无关属性与方法
 
 public Observable<R> toObservable(){
 
 return Observable.defer(new Func0<Observable<R>>() {
 @Override
 public Observable<R> call(){
 // ....
 }
 }
 
 }
 
 public Observable<R> observe(){
 // us a ReplaySubject to buffer the eagerly subscribed-to Observable
 ReplaySubject<R> subject = ReplaySubject.create();
 // eagerly kick off subscription
 final Subscription sourceSubscription = toObservable().subscribe(subject);
 // return the subject that can be subscribed to later while the execution has already started
 return subject.doOnUnsubscribe(new Action0() {
 @Override
 public void call(){
 sourceSubscription.unsubscribe();
 }
 });
 }
}
// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R>{
 // ... 省略无关属性与方法
 
 public Future<R> queue(){
 final Future<R> delegate = toObservable().toBlocking().toFuture();
 final Future<R> f = new Future<R>() {
 // ... 包装 delegate
 }
 // ...
 return f;
 }
 public R execute(){
 try {
 return queue().get();
 } catch (Exception e) {
 throw Exceptions.sneakyThrow(decomposeException(e));
 }
 }
 
 protected abstract R run() throws Exception;
}
 
  #toObservable() 方法 : 未 做订阅,返回干净的 Observable 。 这就是为什么上文说“未调用” 。 #observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅 。 
   #queue() 方法 :调用 #toObservable() 方法的基础上,调用: 
   Observable#toBlocking() 方法 :将 Observable 转换成 阻塞 的 rx.observables.BlockingObservable 。 BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法 执行结果的 Future 。 
     #run() 方法 :子类实现该方法,执行 正常的业务逻辑 。 
       #execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法, 同步 返回 #run() 的执行结果。 整理四种调用方式如下:
  
 
FROM 《【翻译】Hystrix文档-实现原理》
 
 
    本小节为 拓展内容 ,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的实现,所以你可以选择: 
《ReactiveX/RxJava文档中文版 —— 阻塞操作》
BlockingObservable 的方法不是将一个 Observable 变换为另一个,也不是过滤Observables,它们会打断 Observable 的调用链,会阻塞等待直到 Observable 发射了想要的数据,然后返回这个数据(而不是一个 Observable )。
 调用 Observable#toBlocking() 或 BlockingObservable#from(Observable) 方法,将 Observable 转换成 BlockingObservable 。代码如下: 
// BlockingObservable.java
public final class BlockingObservable<T>{
 // ... 省略无关代码
 private final Observable<? extends T> o;
 
 private BlockingObservable(Observable<? extends T> o){
 this.o = o;
 }
 public static <T> BlockingObservable<T> from(final Observable<? extends T> o){
 return new BlockingObservable<T>(o);
 }
}
// Observable.java
public class Observable<T>{
 // ... 省略无关代码
 public final BlockingObservable<T> toBlocking(){
 return BlockingObservable.from(this);
 }
}
 
  《ReactiveX/RxJava文档中文版 —— TO》
 
 
    #toFuture() 操作符也是只能用于 BlockingObservable 。这个操作符将Observable 转换为一个返回 单个数据项 的 Future 。 
 如果你想将发射多个数据项的 Observable 转换为 Future ,可以这样用: myObservable.toList().toBlocking().toFuture() 。 
 点击 链接 查看 #toFuture() 的代码实现: 
#onNext() 方法,设置执行的返回值( value )。 #onCompleted() 方法,CountDownLatch ( finished ) 减一。 #onError() 方法,设置执行时发生的异常( error ),并 CountDownLatch ( finished ) 减一。 error ) 判断是否执行完成;通过 value , error 获得执行的结果。 第一篇 Hystrix 正式的源码解析。
梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。
胖友,分享一波朋友圈可好!