Hystrix作为后端弹性架构的一把利器,用处可以说非常的广泛,最近在写代码的时候接触到了这个框架,所以趁着业余时间粗粗的看了下其中的源码,发现有很多地方值得学习,于是准备写几篇文章记录一下。
Hystrix的功能比较多,这一篇文章先探讨其中一个比较简单的功能——fallback的具体实现。
下面我们先来看一下fallback的一个具体使用场景。
@HystrixCommand( fallbackMethod = "fallbackFunc", commandProperties = { //超过此时间,HystrixCommand被标记为TIMEOUT,并执行回退逻辑,默认1000ms @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "500"), //设置在回路被打开,拒绝请求到再次尝试请求并决定回路是否继续打开的时间,默认5000ms @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "60000"), //设置打开回路并启动回退逻辑的错误比率,默认值50% @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50") }, threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "200") }) @Override public Map<String, Boolean> someFunc() { ....... }
可以看到只要一个方法被注解上了HystrixCommand注解,那么这个方法就会被Hystrix监控,注解中有一个参数叫fallbackMethod,很显然, 就是当被注解的方法发生异常之后,会调用fallbackMethod ,在上文中也就是fallbackFunc。值得一提的是HystrixCommand注解有很多的参数,这也是Hystrix功能核心所在,具体的参数可以参考 这篇文章 。
既然是注解,又是在SpringMVC中,自然而然的就联想到了AOP,翻一翻源码,果不其然有一个HystrixCommandAspect类。
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause() != null ? e.getCause() : e; } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; }
Pointcut指向2个注解,其中一个就是我们用到的HystrixCommand。
具体的编织方法,我们来看下面这一段:
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause() != null ? e.getCause() : e; } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result;
很有意思的第四点,竟然有RxJava的代码在其中,其实看过源码你就会知道,Hystrix内部已经深度集成了RxJava了,这对做Android的我来说还是有点惊喜的,我之前的博客中也有几篇关于RxJava的文章哦~
我们看返回值不是Observable的,也就是下面这一行执行代码:
result = CommandExecutor.execute(invokable, executionType, metaHolder);
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } }
根据executionType去执行,我们看其中的同步执行:
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) { if (invokable instanceof HystrixExecutable) { return (HystrixExecutable) invokable; } throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode"); }
其实很简单啦,就是调用command的execute方法。
command是Hystrix的核心理念,其中的基类是AbstractCommand,我们来看其子类HystrixCommand的execute方法:
public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
其中调用了queue方法,而queue方法中最核心的一行代码是:
final Future<R> delegate = toObservable().toBlocking().toFuture();
又见Rxjava的影子,可见其真的是深度集成啊~
toObservable的代码灰常的长,但是如果你对RxJava有所了解的话,其实逻辑是非常清晰的,我们提取其中关键的代码:
return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ...... Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // put in cache if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } });
通过defer去做延时绑定,并且判断缓存的逻辑,我们这篇文章不关注缓存,而已afterCache就是hystrixObservable,最后通过doOnTerminate,doOnUnsubscribe和doOnComplete去做收尾的一些工作。
可以看到Hystrix内部是如何结合Rxjava去做异步操作的,逻辑很清晰,这里不得不吹一下RxJava了,真是好用啊!!
下面我们来看applyHystrixSemantics这个observable。
rivate Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.attemptExecution()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
其中又是RxJava的代码,最关键的代码是:
return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease);
这里我们就不一一分析的,其实光看名字就知道了,executeCommandAndObserve做具体的方法执行:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); ..... final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
execution的链式调用中有onErrorResumeNext,并传入了handlerFallback,而handlerFallback中处理了具体的一些错误,入超时等。说道超时,我们可以看一下和它相关的代码,execution在生成的时候,有一个HystrixObservableTimeoutOperator操作符:
@Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); // if the child unsubscribes we unsubscribe our parent as well child.add(s); //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); TimerListener listener = new TimerListener() { @Override public void tick() { // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath // otherwise it means we lost a race and the run() execution completed or did not start if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // shut down the original request s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout } } @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } };
它的call方法中去做了计时,如果超时,就抛出HystrixTimeoutException这个exception。
我们来看和HystrixTimeoutException相关的handleTimeoutViaFallback。
private Observable<R> handleTimeoutViaFallback() { return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException()); }
调用了getFallbackOrThrowException方法,而其中则会调用getFallbackAction方法。
protected CommandAction getFallbackAction() { return commandActions.getFallbackAction(); }
commandActions是怎么传入的呢?回到Aspect类中:
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
command是通过工厂类生成的。
public HystrixInvokable create(MetaHolder metaHolder) { HystrixInvokable executable; if (metaHolder.isCollapserAnnotationPresent()) { executable = new CommandCollapser(metaHolder); } else if (metaHolder.isObservable()) { executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } else { executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return executable; }
看最后一个else里的代码是去new了一个command。
protected AbstractHystrixCommand(HystrixCommandBuilder builder) { super(builder.getSetterBuilder().build()); this.commandActions = builder.getCommandActions(); this.collapsedRequests = builder.getCollapsedRequests(); this.cacheResultInvocationContext = builder.getCacheResultInvocationContext(); this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext(); this.ignoreExceptions = builder.getIgnoreExceptions(); this.executionType = builder.getExecutionType(); }
在构造函数中传入了commandActions。那HystrixCommandBuilder又是怎么生成呢?
public <ResponseType> HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests) { validateMetaHolder(metaHolder); return HystrixCommandBuilder.builder() .setterBuilder(createGenericSetterBuilder(metaHolder)) .commandActions(createCommandActions(metaHolder)) .collapsedRequests(collapsedRequests) .cacheResultInvocationContext(createCacheResultInvocationContext(metaHolder)) .cacheRemoveInvocationContext(createCacheRemoveInvocationContext(metaHolder)) .ignoreExceptions(metaHolder.getCommandIgnoreExceptions()) .executionType(metaHolder.getExecutionType()) .build(); }
builder中的commandActions是通过metaHolder生成的:
private CommandActions createCommandActions(MetaHolder metaHolder) { CommandAction commandAction = createCommandAction(metaHolder); CommandAction fallbackAction = createFallbackAction(metaHolder); return CommandActions.builder().commandAction(commandAction) .fallbackAction(fallbackAction).build(); }
其中createFallbackAction会通过metaHolder找到被注解方法中有没有注解上fallbackMethod(第一章节中的fallbackFunc),如果有,则传入其中。
至此,我们已经分析完了Hystrix中的fallback机制,总得来说就是通过RxJava去做异步操作,并获取注解中的fallbackMethod,在捕获异常之后调用具体的方法。