= 29
本文出自:【InTheWorld的博客】 (欢迎留言、交流)
Spring Cloud“全家桶”风头正劲,Hystrix作为服务容错保护组件也是挺有名气。最近我有在看一些Spring Cloud的内容,其中就包括Hystrix。这里我打算从宏观理论和微观实现两个部分来分析Hystrix。
首先是宏观理论了,先抛出两个问题。Hystrix的设计目的是什么?应该怎么完成这些目标?针对第一个问题,我们首先需要明确的是微服务架构应该是有一定的容错性的,而服务不可用的问题是客观存在的。而且这些服务错误常常会恶化和扩散,结果造成更严重的负面影响。所以在无法绝对保证服务可用性的前提下,我们需要一种机制来保护服务错误。
Hystrix的作用主要体现在一下几个方面,
Hystrix是怎么完成这些需求呢?个人觉得Hystrix的实现有以下几个关键点;
Hystrix使用了“命令模式”,每一个依赖请求都是HystrixCommand或者HystrixObservableCommand。Hystrix的命令执行流程图如下:
HystrixCommand和HystrixObservableCommand的区别主要体现在异步执行的返回值句柄不同。它们分别对应传统型的Future、和RxJava中的Observable。虽然看似用法略有不同,但是内部实现都是通过RxJava的形式完成的。HystrixCommand的简单使用方法如下:
@HystrixCommand(fallbackMethod = "findByIdFallback") @GetMapping("/user/{id}") public User findById(@PathVariable Long id) { return this.restTemplate.getForObject("http://microservice-provider-user/" + id, User.class); } User findByIdFallback(Long id, Throwable throwable) { LOGGER.error("进入回退方法,异常:", throwable); User user = new User(); user.setId(-1L); user.setName("默认用户"); return user; }
HystrixCommand这个注解相当于把findById这个函数包装成了一个HystrixCommand。对这个api端点的请求都会引发一个对应的HystrixCommand执行。HystrixCommand注解除了fallbackMethod,还有很多其他的参数,这些参数会用来构造HystrixCommand命令。
HystrixCommand命令是如何执行的呢?接下来,我们就通过对Hystrix源代码的分析来理解它!无论是HystrixCommand还是HystrixObservableCommand,都是AbstractCommand的子类。所以它包含了很多Hystrix命令执行的细节。以HystrixCommand类为例,它的执行其实是通过调用execute()方法完成的。这个方法的实现如下:
public R execute() { try { return this.queue().get(); } catch (Exception var2) { throw this.decomposeException(var2); } } public Future<R> queue() { final Future<R> delegate = this.toObservable().toBlocking().toFuture(); Future<R> f = new Future<R>() { public R get() throws InterruptedException, ExecutionException { return delegate.get(); } public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; if(f.isDone()) { /* */ } else { return f; } }
可以看到execute()方法实际上是通过调用queue()方法实现的,而queue()方法的目的就是构造出一个Future f。这个f其实仅仅是封装了delegate这个Futrue。所以一番分析之后,我们知道了问题的关键所在,其实就是这行this.toObservable().toBlocking().toFuture()。那就从toObservable()开始分析!
public Observable<R> toObservable() { final Action0 terminateCommandCleanup = new Action0() { /* */ }; final Action0 unsubscribeCommandCleanup = new Action0() { /* */ }; final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { public Observable<R> call() { return AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this); } }; final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { /* */ }; final Action0 fireOnCompletedHook = new Action0() { /* */ }; return Observable.defer(new Func0<Observable<R>>() { public Observable<R> call() { if(!AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.NOT_STARTED, AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED)) { } else { Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable afterCache; if(requestCacheEnabled && cacheKey != null) { /* */ afterCache = toCache.toObservable(); } else { afterCache = hystrixObservable; } return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook); } } }); }
虽然这个方法的代码不少,但还是可以很快的定位到关键点——applyHystrixSemantics(); 这个函数完成了Hystrix的基本语义。首先是熔断器的语义,如果熔断器允许请求就去执行请求,反之则直接执行fallback函数。在正常的请求通路下,最终会调用executeCommandWithSpecifiedIsolation()方法来完成请求的执行。从这个函数名就可以看出,这个方法完成了Hystrix隔离的语义。由于这个方法代码比较长,就不贴完整代码了。但是方法中的一个subscribeOn()运算符非常显眼,如下所示:
subscribeOn(this.threadPool.getScheduler(new Func0<Boolean>() { public Boolean call() { return Boolean.valueOf(((Boolean)AbstractCommand.this.properties.executionIsolationThreadInterruptOnTimeout().get()).booleanValue() && _cmd.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT); } }));
这个threadPool就是构造HystrixCommand的时候设定的线程池,现在大家应该可以理解Hystrix如何实现隔离的。回到那行代码this.toObservable().toBlocking().toFuture()。toBlocking()仅仅是为了构造一个BlockingObservable,然后通过toFuture()构造出一个Future。其实构造这个future的过程也很简单,只是订阅上游的Observable,然后通知和更新Future,具体过程如下:
public final class BlockingOperatorToFuture { public static <T> Future<T> toFuture(Observable<? extends T> that) { final CountDownLatch finished = new CountDownLatch(1); final AtomicReference<T> value = new AtomicReference(); final AtomicReference<Throwable> error = new AtomicReference(); final Subscription s = that.single().subscribe(new Subscriber<T>() { public void onCompleted() { finished.countDown(); } public void onError(Throwable e) { error.compareAndSet((Object)null, e); finished.countDown(); } public void onNext(T v) { value.set(v); } }); return new Future<T>() { private volatile boolean cancelled; public boolean isDone() { return finished.getCount() == 0L; } public T get() throws InterruptedException, ExecutionException { finished.await(); return this.getValue(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if(finished.await(timeout, unit)) { return this.getValue(); } else { throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable."); } } private T getValue() throws ExecutionException { Throwable throwable = (Throwable)error.get(); if(throwable != null) { throw new ExecutionException("Observable onError", throwable); } else if(this.cancelled) { throw new CancellationException("Subscription unsubscribed"); } else { return value.get(); } } }; } }
这个返回的Future就是前面的delegate。讲到这里,Hystrix command的大致回路,算是跑马观花的看了一遍。具体的一些细节实现,这里就先略过了!
对于Hystrix的具体使用,可以参考这个 https://github.com/eacdy/spring-cloud-study 。我也有看这个demo项目学习Spring Cloud。