SpringCloud框架,没有特殊的实现。即,请求到达Zuul网关后,由Ribbon负载均衡到目标组件节点,由Hystrix转发请求。
hystrix.command.default.execution.isolation.strategy=THREAD hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000
一次调用中发现,请求过程超过10s后,后台已经打印HystrixTimeoutException且进入了自定义的FallbackProvider,前端仍然没有收到响应,直到请求链路处理完成,前端才返回FallbackProvider中返回的异常响应。
看文档- Hystrix官方文档 ,THREAD隔离模式下是请求超时是会取消调用线程从而立即返回的,SEMAPHORE模式下会等待响应回来再判断是否超时。而上述配置的是所有的Route都默认是THREAD-线程隔离模式,遂认为配置没问题;
跟踪源码,RxJava实现的,响应式编程不熟悉,初期调试时没头苍蝇一样到处打断点,看得不明所以(现在也是)。网上的资料大多是翻译上述文档,只知道是HystrixCommand.execute()发送请求,AbstractCommand.handleTimeoutViaFallback()触发FallbackProvider,中间超时如何处理没有说清;
开启DEBUG级别日志,用日志跟踪一个请求的全部过程。发现打印的配置是 executionIsolationStrategy=SEMAPHORE !!!查阅SpringCloud相关资料,发现用Hystrix+Ribbon的时候,发送请求用的是HystrixCommand的AbstractRibbonCommand实现,而后者的部分配置会覆盖掉HystrixCommandProperties中的配置,其中就有隔离模式这项配置,用的是ZuulProperties中的默认值SEMAPHORE:
protected AbstractRibbonCommand(Setter setter, LBC client, RibbonCommandContext context, ZuulFallbackProvider fallbackProvider, IClientConfig config) { //将setter传到HystrixCommand的构造方法中 super(setter); this.client = client; this.context = context; this.zuulFallbackProvider = fallbackProvider; this.config = config; } //创建Setter protected static HystrixCommandProperties.Setter createSetter(IClientConfig config, String commandKey, ZuulProperties zuulProperties) { int hystrixTimeout = getHystrixTimeout(config, commandKey); return HystrixCommandProperties.Setter().withExecutionIsolationStrategy( //executionIsolationStrategy用的是ZuulProperties中的值 zuulProperties.getRibbonIsolationStrategy()).withExecutionTimeoutInMilliseconds(hystrixTimeout); }
//默认是SEMAPHORE private ExecutionIsolationStrategy ribbonIsolationStrategy = SEMAPHORE;
//最终setter作为参数builder传入 protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperties.Setter builder, String propertyPrefix) { this.key = key; // 省略其它配置 this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);
a. 指定commandKey的方式
hystrix.command.aService.execution.isolation.strategy=THREAD
b. 修改Zuul配置的方式,注意useSeparateThreadPools默认为false,此时所有组件共用一个commandKey=RibbinCommand的线程池
zuul.ribbonIsolationStrategy=THREAD指定ribbon的隔离模式 zuul.threadPool.useSeparateThreadPools=true每个commandKey一个线程池
在对RxJava有大概了解的准备下,梳理Hystrix关键请求流程如下:
a. 发送请求,进入Zuul过滤器RibbonRoutingFilter,通过工厂类创建AbstractRibbonCommand,调用其execute方法
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception { Map<String, Object> info = this.helper.debug(context.getMethod(), context.getUri(), context.getHeaders(), context.getParams(), context.getRequestEntity()); // 创建AbstractRibbonCommand RibbonCommand command = this.ribbonCommandFactory.create(context); try { // 调用execute方法 ClientHttpResponse response = command.execute(); this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders()); return response; } catch (HystrixRuntimeException ex) { return handleException(info, ex); } }
b. 进入HystrixCommand.execute(),实际是调用Future.get()来立即获取异步方法HystrixCommand.queue()的结果
public R execute() { try { //queue方法返回的是Future return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
c. 通过AbstractCommand.toObservable()创建一个待订阅的被观察对象(即Observable),创建过程:
Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // put in cache if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { //没有缓存使用applyHystrixSemantics afterCache = hystrixObservable; }
// 获取信号量,THREAD模式下 if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); }
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { //超时异常在此处理 return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } };
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { ... }).doOnTerminate(new Action0() { ... }).doOnUnsubscribe(new Action0() { ... }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { //原始HystrixCommand的状态为TIMED_OUT时 return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); }
@Override public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { //动态更新配置 touchConfig(); return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); }
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; // actualScheduler是ThreadPoolScheduler this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); } @Override public Worker createWorker() { // 创建HystrixContextSchedulerWorker,参数是ThreadPoolWorker return new HystrixContextSchedulerWorker(actualScheduler.createWorker()); }
@Override public Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } // This is internal RxJava API but it is too useful. // 包装action ScheduledAction sa = new ScheduledAction(action); // 这里不懂这个操作啥意思 subscription.add(sa); sa.addParent(subscription); // 获取执行器 ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); // 执行action FutureTask<?> f = (FutureTask<?>) executor.submit(sa); // 加入中断线程的subscription sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; }
@Override public void unsubscribe() { // 移除上述action executor.remove(f); if (shouldInterruptThread.call()) {//结果为true取消future f.cancel(true); } else { f.cancel(false); } }