转载

Hystrix fallback机制浅析

Hystrix作为后端弹性架构的一把利器,用处可以说非常的广泛,最近在写代码的时候接触到了这个框架,所以趁着业余时间粗粗的看了下其中的源码,发现有很多地方值得学习,于是准备写几篇文章记录一下。

Hystrix的功能比较多,这一篇文章先探讨其中一个比较简单的功能——fallback的具体实现。

怎么用

下面我们先来看一下fallback的一个具体使用场景。

@HystrixCommand(
        fallbackMethod = "fallbackFunc",
        commandProperties = {
                //超过此时间,HystrixCommand被标记为TIMEOUT,并执行回退逻辑,默认1000ms
                @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "500"),
                //设置在回路被打开,拒绝请求到再次尝试请求并决定回路是否继续打开的时间,默认5000ms
                @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "60000"),
                //设置打开回路并启动回退逻辑的错误比率,默认值50%
                @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50")
        },
        threadPoolProperties = {
                @HystrixProperty(name = "coreSize", value = "200")
        })
@Override
public Map<String, Boolean> someFunc() {

		.......
}

可以看到只要一个方法被注解上了HystrixCommand注解,那么这个方法就会被Hystrix监控,注解中有一个参数叫fallbackMethod,很显然, 就是当被注解的方法发生异常之后,会调用fallbackMethod ,在上文中也就是fallbackFunc。值得一提的是HystrixCommand注解有很多的参数,这也是Hystrix功能核心所在,具体的参数可以参考 这篇文章 。

源码分析

既然是注解,又是在SpringMVC中,自然而然的就联想到了AOP,翻一翻源码,果不其然有一个HystrixCommandAspect类。

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")

public void hystrixCommandAnnotationPointcut() {
}

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                "annotations at the same time");
    }
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
        if (!metaHolder.isObservable()) {
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause() != null ? e.getCause() : e;
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

Pointcut指向2个注解,其中一个就是我们用到的HystrixCommand。

具体的编织方法,我们来看下面这一段:

MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
        metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

Object result;
try {
    if (!metaHolder.isObservable()) {
        result = CommandExecutor.execute(invokable, executionType, metaHolder);
    } else {
        result = executeObservable(invokable, executionType, metaHolder);
    }
} catch (HystrixBadRequestException e) {
    throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
    throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
  1. 用于joinPoint生成一个metaHolder,这个metaHolder中存储了和被注解方法相关的信息。
  1. 根据metaHolder生成一个invokable,而这个invokable就是我们的command。
  2. 根据注解的类型获取一个executionType。
  3. 判断该方法的返回类型是否是Observable,根绝返回类型进行执行。

很有意思的第四点,竟然有RxJava的代码在其中,其实看过源码你就会知道,Hystrix内部已经深度集成了RxJava了,这对做Android的我来说还是有点惊喜的,我之前的博客中也有几篇关于RxJava的文章哦~

我们看返回值不是Observable的,也就是下面这一行执行代码:

result = CommandExecutor.execute(invokable, executionType, metaHolder);
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);

    switch (executionType) {
        case SYNCHRONOUS: {
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}

根据executionType去执行,我们看其中的同步执行:

private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
    if (invokable instanceof HystrixExecutable) {
        return (HystrixExecutable) invokable;
    }
    throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}

其实很简单啦,就是调用command的execute方法。

command是Hystrix的核心理念,其中的基类是AbstractCommand,我们来看其子类HystrixCommand的execute方法:

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

其中调用了queue方法,而queue方法中最核心的一行代码是:

final Future<R> delegate = toObservable().toBlocking().toFuture();

又见Rxjava的影子,可见其真的是深度集成啊~

toObservable的代码灰常的长,但是如果你对RxJava有所了解的话,其实逻辑是非常清晰的,我们提取其中关键的代码:

return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        ......
        Observable<R> hystrixObservable =
                Observable.defer(applyHystrixSemantics)
                        .map(wrapWithAllOnNextHooks);

        Observable<R> afterCache;

        // put in cache
        if (requestCacheEnabled && cacheKey != null) {
            // wrap it for caching
            HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
            HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
            if (fromCache != null) {
                // another thread beat us so we'll use the cached value instead
                toCache.unsubscribe();
                isResponseFromCache = true;
                return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
            } else {
                // we just created an ObservableCommand so we cast and return it
                afterCache = toCache.toObservable();
            }
        } else {
            afterCache = hystrixObservable;
        }

        return afterCache
                .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                .doOnCompleted(fireOnCompletedHook);
    }
});

通过defer去做延时绑定,并且判断缓存的逻辑,我们这篇文章不关注缓存,而已afterCache就是hystrixObservable,最后通过doOnTerminate,doOnUnsubscribe和doOnComplete去做收尾的一些工作。

可以看到Hystrix内部是如何结合Rxjava去做异步操作的,逻辑很清晰,这里不得不吹一下RxJava了,真是好用啊!!

下面我们来看applyHystrixSemantics这个observable。

rivate Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // mark that we're starting execution on the ExecutionHook
    // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
    executionHook.onStart(_cmd);

    /* determine if we're allowed to execute */
    if (circuitBreaker.attemptExecution()) {
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };

        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };

        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        return handleShortCircuitViaFallback();
    }
}

其中又是RxJava的代码,最关键的代码是:

return executeCommandAndObserve(_cmd)
        .doOnError(markExceptionThrown)
        .doOnTerminate(singleSemaphoreRelease)
        .doOnUnsubscribe(singleSemaphoreRelease);

这里我们就不一一分析的,其实光看名字就知道了,executeCommandAndObserve做具体的方法执行:

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

    .....
   
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            circuitBreaker.markNonSuccess();
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                /*
                 * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                 */
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }

                return handleFailureViaFallback(e);
            }
        }
    };

    final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
        @Override
        public void call(Notification<? super R> rNotification) {
            setRequestContextIfNeeded(currentRequestContext);
        }
    };

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

execution的链式调用中有onErrorResumeNext,并传入了handlerFallback,而handlerFallback中处理了具体的一些错误,入超时等。说道超时,我们可以看一下和它相关的代码,execution在生成的时候,有一个HystrixObservableTimeoutOperator操作符:

@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
    final CompositeSubscription s = new CompositeSubscription();
    // if the child unsubscribes we unsubscribe our parent as well
    child.add(s);

    //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
    final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

    TimerListener listener = new TimerListener() {

        @Override
        public void tick() {
            // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
            // otherwise it means we lost a race and the run() execution completed or did not start
            if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                // report timeout failure
                originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                // shut down the original request
                s.unsubscribe();

                final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                    @Override
                    public void run() {
                        child.onError(new HystrixTimeoutException());
                    }
                });


                timeoutRunnable.run();
                //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
            }
        }

        @Override
        public int getIntervalTimeInMilliseconds() {
            return originalCommand.properties.executionTimeoutInMilliseconds().get();
        }
    };

它的call方法中去做了计时,如果超时,就抛出HystrixTimeoutException这个exception。

我们来看和HystrixTimeoutException相关的handleTimeoutViaFallback。

private Observable<R> handleTimeoutViaFallback() {
    return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
}

调用了getFallbackOrThrowException方法,而其中则会调用getFallbackAction方法。

protected CommandAction getFallbackAction() {
    return commandActions.getFallbackAction();
}

commandActions是怎么传入的呢?回到Aspect类中:

HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);

command是通过工厂类生成的。

public HystrixInvokable create(MetaHolder metaHolder) {
    HystrixInvokable executable;
    if (metaHolder.isCollapserAnnotationPresent()) {
        executable = new CommandCollapser(metaHolder);
    } else if (metaHolder.isObservable()) {
        executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    } else {
        executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    }
    return executable;
}

看最后一个else里的代码是去new了一个command。

protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
    super(builder.getSetterBuilder().build());
    this.commandActions = builder.getCommandActions();
    this.collapsedRequests = builder.getCollapsedRequests();
    this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
    this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
    this.ignoreExceptions = builder.getIgnoreExceptions();
    this.executionType = builder.getExecutionType();
}

在构造函数中传入了commandActions。那HystrixCommandBuilder又是怎么生成呢?

public <ResponseType> HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests) {
    validateMetaHolder(metaHolder);

    return HystrixCommandBuilder.builder()
            .setterBuilder(createGenericSetterBuilder(metaHolder))
            .commandActions(createCommandActions(metaHolder))
            .collapsedRequests(collapsedRequests)
            .cacheResultInvocationContext(createCacheResultInvocationContext(metaHolder))
            .cacheRemoveInvocationContext(createCacheRemoveInvocationContext(metaHolder))
            .ignoreExceptions(metaHolder.getCommandIgnoreExceptions())
            .executionType(metaHolder.getExecutionType())
            .build();
}

builder中的commandActions是通过metaHolder生成的:

private CommandActions createCommandActions(MetaHolder metaHolder) {
    CommandAction commandAction = createCommandAction(metaHolder);
    CommandAction fallbackAction = createFallbackAction(metaHolder);
    return CommandActions.builder().commandAction(commandAction)
            .fallbackAction(fallbackAction).build();
}

其中createFallbackAction会通过metaHolder找到被注解方法中有没有注解上fallbackMethod(第一章节中的fallbackFunc),如果有,则传入其中。

总结

至此,我们已经分析完了Hystrix中的fallback机制,总得来说就是通过RxJava去做异步操作,并获取注解中的fallbackMethod,在捕获异常之后调用具体的方法。

原文  http://zjutkz.net/2018/07/12/Hystrix-fallback机制浅析/
正文到此结束
Loading...