转载

Spring Cloud Zuul 在传统路由模式下使用Hystrix

前言

最近这段时间在公司内部分享了Spring Cloud的一些功能,结合目前公司使用的框架,针对这段时间调研的SC(Spring Cloud)技术,对现有的架构中融入了一些自定义的功能.

目前公司使用的架构是Spring Cloud Zuul+dubbo ,使用docker容器,k8s对容器进行管理,我们主要看Zuul,后面考虑把Hystrix整合进dubbo.因为我们不是完全用的sc的一套功能,只是用了sc的网关,使用网关对请求进行路由到服务,虽然sc默认包含有ribbon和hystrix功能,但是只限于面向服务的路由,而我们服务没有整合到eureka中,所以我们用的是传统路由模式,路由转发地址是svc的地址,相当于默认已经有了负载均衡的功能,所以我不需要使用ribbon,只需要使用hystrix.

分析

对zuul比较了解的同学知道,zuul使用传统路由模式和使用面向服务的路由模式分别由 SimpleHostRoutingFilterRibbonRoutingFilter 两个route类型的过滤器进行路由的,所以我们首先需要分析这两者之间的差别

SimpleHostRoutingFilter

此过滤器是传统路由方式的route过滤器,其shouldFilter()方法如下:

@Override
public boolean shouldFilter() {
    return RequestContext.getCurrentContext().getRouteHost() != null
            && RequestContext.getCurrentContext().sendZuulResponse();
}

可以发现,决定是否过滤的关键条件是上下文中是否含有传统路由的URL.所以,此过滤器处理的是传统路由模式的请求过滤.

通过查看其构造器,不难发现,其初始化是在 ZuulProxyAutoConfiguration 中,传入了 ZuulProperties zuul的一些配置信息,来初始化这个filter,然后还有一堆http连接管理工厂以及一个 ProxyRequestHelper bean来处理一些request和response的请求和响应.

查看其核心 run() 方法,不难发现,其主要执行逻辑就是构建了一个httpClient,然后封装请求信息,执行请求获取响应然后返回,并且将请求信息设置到RequestContext中,返回请求结果到客户端,有兴趣的同学可以看下源码:

@Override
public Object run() {
    RequestContext context = RequestContext.getCurrentContext();
    HttpServletRequest request = context.getRequest();
    MultiValueMap<String, String> headers = this.helper
            .buildZuulRequestHeaders(request);
    MultiValueMap<String, String> params = this.helper
            .buildZuulRequestQueryParams(request);
    String verb = getVerb(request);
    InputStream requestEntity = getRequestBody(request);
    if (getContentLength(request) < 0) {
        context.setChunkedRequestBody();
    }

    String uri = this.helper.buildZuulRequestURI(request);
    this.helper.addIgnoredHeaders();

    try {
        //封装请求信息,执行请求获取响应
        CloseableHttpResponse response = forward(this.httpClient, verb, uri, request,
                headers, params, requestEntity);
        //设置请求结果
        setResponse(response);
    }
    catch (Exception ex) {
        throw new ZuulRuntimeException(ex);
    }
    return null;
}

所以,可以看出之所以传统路由模式没有hystrix和ribbon功能,是因为它直接是使用的apache的HttpClient来执行请求转发,返回结果直接响应,没有对请求做拦截,也没有对请求作熔断处理,简单粗暴.

RibbonRoutingFilter

此过滤器是面向服务路由的过滤器,老规矩, shouldFilter() :

@Override
public boolean shouldFilter() {
    RequestContext ctx = RequestContext.getCurrentContext();
    return (ctx.getRouteHost() == null && ctx.get(SERVICE_ID_KEY) != null
            && ctx.sendZuulResponse());
}

判断此过滤器是否执行的关键条件是上下文中不含有传统路由的URL,并且服务id不能为空.

再看看其构造器,同样的,此构造器的初始化也是在 ZuulProxyAutoConfiguration 中,所不同的是,注入了一个 RibbonCommandFactory ,这个就是 RibbonReoutingFilter 的核心,看这个名字,用屁股都能想到肯定是和ribbon还有hystrix有关.

再看看它的run()方法:

@Override
public Object run() {
    RequestContext context = RequestContext.getCurrentContext();
    this.helper.addIgnoredHeaders();
    try {
        //构建一些请求信息
        RibbonCommandContext commandContext = buildCommandContext(context);
        ClientHttpResponse response = forward(commandContext);
        setResponse(response);
        return response;
    }
    catch (ZuulException ex) {
        throw new ZuulRuntimeException(ex);
    }
    catch (Exception ex) {
        throw new ZuulRuntimeException(ex);
    }
}

这里看到,主要就是做了一个操作,构建了一个 RibbonCommandContext 上下文对象,存放了一些serviceId,meothod,uri,header...的一些请求信息然后拿这些信息去执行 forward(commandContext) ,那我们就看看 forward() 去解解毒:

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
    Map<String, Object> info = this.helper.debug(context.getMethod(),
            context.getUri(), context.getHeaders(), context.getParams(),
            context.getRequestEntity());

    RibbonCommand command = this.ribbonCommandFactory.create(context);
    try {
        ClientHttpResponse response = command.execute();
        this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
        return response;
    }
    catch (HystrixRuntimeException ex) {
        return handleException(info, ex);
    }

}

这个方法就是用上面的请求信息和构造器传入的 RibbonCommandFactory 创建了一个command对象,然后通过此对象去执行操作然后返回response到客户端.

看起来貌似和之前的 SimpleHostRoutingFilter 流程差不了太多,都是构建请求信息,然后获取响应,然后响应到客户端,那么为么比这个 RibbonRoutingFilter 就是吊一些呢,又有熔断又有负载均衡的.所以这里就可以破案了,他的这些个功能就是因为这个command对象,下面我们就去看看这个command到底是什么.

public interface RibbonCommand extends HystrixExecutable<ClientHttpResponse> {
}

看到这里可能有的人就了然了,这个 HystrixExecutable 是为 HystrixCommand 设计的接口,主要提供执行命令的抽象方法,例如: execute() , queue() , observe() ,所以他才能有熔断的功能,那么负载均衡呢,我猜八成是和上面的 RibbonCommandContext 有关,因为这个context对象里面放了serviceId,说明了实例的选择都是这里面操作的.

下面我们去看看这段代码 ribbonCommandFactory.create(context) -> org.springframework.cloud.netflix.zuul.filters.route.RestClientRibbonCommandFactory#create ,

@Override
@SuppressWarnings("deprecation")
public RestClientRibbonCommand create(RibbonCommandContext context) {
    String serviceId = context.getServiceId();
    //根据serviceId 获取服务降级的provider,供后面调用服务发生异常的时候调用
    ZuulFallbackProvider fallbackProvider = getFallbackProvider(serviceId);
    RestClient restClient = this.clientFactory.getClient(serviceId,
            RestClient.class);
    return new RestClientRibbonCommand(context.getServiceId(), restClient, context,
            this.zuulProperties, fallbackProvider, clientFactory.getClientConfig(serviceId));
}

这里获取了服务降级的provider,创建了发送请求的 RestClient ,实例化了 RestClientRibbonCommand ,并且将配置注入进去了这个command.

//RestClientRibbonCommand的构造器
public RestClientRibbonCommand(String commandKey, RestClient client,
                               RibbonCommandContext context, ZuulProperties zuulProperties,
                               ZuulFallbackProvider zuulFallbackProvider, IClientConfig config) {
    super(commandKey, client, context, zuulProperties, zuulFallbackProvider, config);
}
    
    
//AbstractRibbonCommand的构造器,是RestClientRibbonCommand的构造器的抽象父类
protected AbstractRibbonCommand(Setter setter, LBC client,
                             RibbonCommandContext context,
                             ZuulFallbackProvider fallbackProvider, IClientConfig config) {
    super(setter);
    this.client = client;
    this.context = context;
    this.zuulFallbackProvider = fallbackProvider;
    this.config = config;
}

//初始化HystrixCommand的配置信息
protected static Setter getSetter(final String commandKey,ZuulProperties zuulProperties, IClientConfig config) {

    // @formatter:off
    Setter commandSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RibbonCommand"))
                            .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
    final HystrixCommandProperties.Setter setter = createSetter(config, commandKey, zuulProperties);
    if (zuulProperties.getRibbonIsolationStrategy() == ExecutionIsolationStrategy.SEMAPHORE){
        final String name = ZuulConstants.ZUUL_EUREKA + commandKey + ".semaphore.maxSemaphores";
        // we want to default to semaphore-isolation since this wraps
        // 2 others commands that are already thread isolated
        final DynamicIntProperty value = DynamicPropertyFactory.getInstance()
                .getIntProperty(name, zuulProperties.getSemaphore().getMaxSemaphores());
        setter.withExecutionIsolationSemaphoreMaxConcurrentRequests(value.get());
    } else if (zuulProperties.getThreadPool().isUseSeparateThreadPools()) {
        final String threadPoolKey = zuulProperties.getThreadPool().getThreadPoolKeyPrefix() + commandKey;
        commandSetter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
    }

return commandSetter.andCommandPropertiesDefaults(setter);
// @formatter:on

}

这里可以看到,在初始化 RestClientRibbonCommand 时,调用了父类的构造器,然后将 HystrixCommand 初始化,将zuul的配置信息配置到 HystrixCommand 中,这里看到,在Zuul中,Hystrix默认的隔离模式就是信号量隔离,而且 commandKey 就是 serviceId .至此,Hystrix初始化完成,然后通过上文的 command.execute() 方法来进行调用,也就是调用 run() 方法,所以,这里我们看看:

@Override
protected ClientHttpResponse run() throws Exception {
    final RequestContext context = RequestContext.getCurrentContext();

    RQ request = createRequest();
    RS response;
    
    boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
            && ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
    
    if (retryableClient) {
        response = this.client.execute(request, config);
    } else {
        response = this.client.executeWithLoadBalancer(request, config);
    }
    context.set("ribbonResponse", response);

    // Explicitly close the HttpResponse if the Hystrix command timed out to
    // release the underlying HTTP connection held by the response.
    //
    if (this.isResponseTimedOut()) {
        if (response != null) {
            response.close();
        }
    }

    return new RibbonHttpResponse(response);
}

这里 run() 方法就是核心的调用逻辑了,这里首先会判断client是否支持重试,如果请求可以重试的话就执行重试逻辑,具体重试的机制可以看这篇文章: http://blog.didispace.com/spring-cloud-zuul-retry-detail/ ,同时不是重试的话就会执行 executeWithLoadBalancer() 从而达到负载均衡的目的.

总结:

至此关于route过滤器的两种路由模式已经梳理完毕:

  • SimpleHostRoutingFilter:本质就是通过apache的httpClient执行请求,然后返回响应信息到客户端,简单粗暴,所以不存在负载均衡,也不存在熔断,降级操作;
  • RibbonRoutingFilter:和 SimpleHostRoutingFilter 不同的是,他创建了一个 RibbonCommand 对象,这个command本质就是HystrixCommand,在HystrixCommand的 run() 方法内发送请求,并在此方法内通过serviceId和client进行负载均衡调用.

细心的同学可能会发现: RibbonRoutingFilter 其实没有对返回的response作任何处理,也就是说,就算被路由的服务端发生了错误,也是返回response,错误都在response里面包着,正常情况下不会抛出异常,当然也不会降级熔断了

传统路由模式改造

通过上面的分析,想要对传统路由模式进行改造添加熔断功能的话,就需要重写 SimpleHostRoutingFilter ,在其作http调用的时候添加熔断功能,然后将返回调用的结果set到 RequestContext 中,这样就完成了路由模式的改造

过滤器改造:SimpleHostRoutingFilter

首先,我们创建一个bean,继承自 SimpleHostRoutingFilter ,查看 SimpleHostRoutingFilter 发现他的成员变量都是私有的,子类无法共享这些变量,所以,我们只能再定义这些变量,这些变量大多都是 httpClient 相关的一些配置类,当然我们也可以选择不使用 httpClient 使用其他的http调用Client比如 RestTemplate ,但是为了做到尽量少的改动,而且让之前的配置改造之后也可以使用,所以我选择没有动他的这套逻辑

那么我们只需要重写一些私有的成员变量和一些私有方法就可以了,当然run()方法也是要重写的,我就暴力一点,直接重写所有的方法,然后改动一些地方就好了,改动后的代码如下:

@Component
public class SimpleHostHystrixRoutingFilter extends SimpleHostRoutingFilter {

    private static final Log log = LogFactory.getLog(SimpleHostHystrixRoutingFilter.class);

    private final Timer connectionManagerTimer = new Timer(
            "SimpleHostRoutingFilter.connectionManagerTimer", true);

    private boolean sslHostnameValidationEnabled;

    private ProxyRequestHelper helper;
    private ZuulProperties.Host hostProperties;
    private ApacheHttpClientConnectionManagerFactory connectionManagerFactory;
    private HttpClientConnectionManager connectionManager;
    private CloseableHttpClient httpClient;
    private boolean customHttpClient = false;
    private RestClientHystrixCommandFactory restClientHystrixCommandFactory;

    @EventListener
    public void onPropertyChange(EnvironmentChangeEvent event) {
        if (!customHttpClient) {
            boolean createNewClient = false;

            for (String key : event.getKeys()) {
                if (key.startsWith("zuul.host.")) {
                    createNewClient = true;
                    break;
                }
            }

            if (createNewClient) {
                try {
                    SimpleHostHystrixRoutingFilter.this.httpClient.close();
                } catch (IOException ex) {
                    log.error("error closing client", ex);
                }
                SimpleHostHystrixRoutingFilter.this.httpClient = newClient();
            }
        }
    }

    public SimpleHostHystrixRoutingFilter(ProxyRequestHelper helper, ZuulProperties properties,
                                          ApacheHttpClientConnectionManagerFactory connectionManagerFactory,
                                          ApacheHttpClientFactory httpClientFactory, RestClientHystrixCommandFactory restClientHystrixCommandFactory) {
        super(helper, properties, connectionManagerFactory, httpClientFactory);
        this.helper = helper;
        this.hostProperties = properties.getHost();
        this.sslHostnameValidationEnabled = properties.isSslHostnameValidationEnabled();
        this.connectionManagerFactory = connectionManagerFactory;
        this.restClientHystrixCommandFactory = restClientHystrixCommandFactory;
        super.checkServletVersion();
    }

    @PostConstruct
    private void initialize() {
        if (!customHttpClient) {
            this.connectionManager = connectionManagerFactory.newConnectionManager(
                    !this.sslHostnameValidationEnabled,
                    this.hostProperties.getMaxTotalConnections(),
                    this.hostProperties.getMaxPerRouteConnections(),
                    this.hostProperties.getTimeToLive(), this.hostProperties.getTimeUnit(),
                    null);
            this.httpClient = newClient();
            this.connectionManagerTimer.schedule(new TimerTask() {
                @Override
                public void run() {
                    if (SimpleHostHystrixRoutingFilter.this.connectionManager == null) {
                        return;
                    }
                    SimpleHostHystrixRoutingFilter.this.connectionManager.closeExpiredConnections();
                }
            }, 30000, 5000);
        }
    }

    @PreDestroy
    public void stop() {
        this.connectionManagerTimer.cancel();
    }


    @Override
    public Object run() {
        RequestContext context = RequestContext.getCurrentContext();

        log.info("======>custom hystrix host routing filter start ,context :{}"+ context);

        HttpServletRequest request = context.getRequest();
        MultiValueMap<String, String> headers = this.helper
                .buildZuulRequestHeaders(request);
        MultiValueMap<String, String> params = this.helper
                .buildZuulRequestQueryParams(request);
        String verb = getVerb(request);
        InputStream requestEntity = getRequestBody(request);

        int contentLength = request.getContentLength();
        ContentType contentType = null;
        if (request.getContentType() != null) {
            contentType = ContentType.parse(request.getContentType());
        }

        InputStreamEntity entity = new InputStreamEntity(requestEntity, contentLength,
                contentType);
        if (getContentLength(request) < 0) {
            context.setChunkedRequestBody();
        }

        String uri = this.helper.buildZuulRequestURI(request);
        this.helper.addIgnoredHeaders();

        HttpRequest httpRequest = super.buildHttpRequest(verb, uri, entity, headers, params, request);

        //routeId作为Hystrix 的 commandKey,
        // eg. zuul.routes.hystrix.path=/hystrix-provider/**  的routeId是hystrix
        String routeId = (String) context.get("proxy");

        try {
            //SEMAPHORE 隔离模式共享的是一个线程的requestContext ,但是thread 模式使用的是另外一个独立的线程池里面的线程,requestContext信息不共享,需要将context 通过构造器传入
            RestClientHystrixCommand command = restClientHystrixCommandFactory.create(routeId, httpClient,httpRequest,getHttpHost(RequestContext.getCurrentContext().getRouteHost()));
            HttpResponse response = command.execute();

            setResponse(response);
        } catch (Exception ex) {
            if((ex instanceof HystrixRuntimeException) && (ex.getCause() instanceof CircuitBreakerResponseException)){
                HttpResponse httpResponse = ((CircuitBreakerResponseException) ex.getCause()).getHttpResponse();
                try {
                    setResponse(httpResponse);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null;
            }
            throw new ZuulRuntimeException(ex);
        }
        return null;
    }


    private MultiValueMap<String, String> revertHeaders(Header[] headers) {
        MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
        for (Header header : headers) {
            String name = header.getName();
            if (!map.containsKey(name)) {
                map.put(name, new ArrayList<>());
            }
            map.get(name).add(header.getValue());
        }
        return map;
    }


    private InputStream getRequestBody(HttpServletRequest request) {
        InputStream requestEntity = null;
        try {
            requestEntity = request.getInputStream();
        } catch (IOException ex) {
            // no requestBody is ok.
        }
        return requestEntity;
    }

    private String getVerb(HttpServletRequest request) {
        String sMethod = request.getMethod();
        return sMethod.toUpperCase();
    }

    private void setResponse(HttpResponse response) throws IOException {
        RequestContext.getCurrentContext().set("zuulResponse", response);
        this.helper.setResponse(response.getStatusLine().getStatusCode(),
                response.getEntity() == null ? null : response.getEntity().getContent(),
                revertHeaders(response.getAllHeaders()));
    }

    private HttpHost getHttpHost(URL host) {
        HttpHost httpHost = new HttpHost(host.getHost(), host.getPort(),
                host.getProtocol());
        return httpHost;
    }

}

这里的构造器我比父类多传了一个 RestClientHystrixCommandFactory ,这个Factory就是创建 HystrixCommand 的工厂bean,里面实例化了一个自定义的 HystrixCommand , run() 方法就是通过这个工厂bean创建了一个 HystrixCommand ,然后进行调用的.

断路器创建工厂: RestClientHystrixCommandFactory

再看 RestClientHystrixCommandFactory :

@Component
public class RestClientHystrixCommandFactory {

    @Autowired(required = false)
    private List<HostFallbackProvider> fallbackList = Collections.emptyList();

    //TODO:暂时只用默认的errorDecode
    @Autowired
    DefaultHttpClientErrorDecoder errorDecoder;

    private Map<String, HostFallbackProvider> fallbackProviderMap;

    @PostConstruct
    public void init() {
        this.fallbackProviderMap = fallbackList.stream().collect(Collectors.toMap(HostFallbackProvider::getRoute, x -> x, (x, y) -> x));
    }


    public RestClientHystrixCommand create(String routeId, CloseableHttpClient httpClient, HttpRequest httpRequest, HttpHost httpHost) {
        HostFallbackProvider fallbackProvider = null;
        if (!fallbackProviderMap.isEmpty()) {
            if (Objects.isNull(fallbackProvider = fallbackProviderMap.get("*"))) {
                fallbackProvider = fallbackProviderMap.get(routeId);
            }
        }
        return new RestClientHystrixCommand(routeId, httpClient,httpRequest,httpHost,fallbackProvider,errorDecoder);
    }
}

这个Factory就是做了两个操作:

  1. 对Hystrix的fallBack进行了拓展,可以让使用者根据自己不同的需求定制不同的fallBack,在发生错误时直接调用fallBack;
  2. 定义了一个Response的 ErrorDecoder ,对请求调用返回的结果进行decode,判断是否需要抛出异常触发熔断/降级,目前没有考虑把这个decoder做成定制化,所以现在默认都是用的我定义的一套 ErrorDecoder .

降级处理: HostFallbackProvider

public interface HostFallbackProvider {

    /**
     * 
     * @Description: 返回routId,表明为哪个rout提供回退,* 表示 为所有route提供回退 
     * @Param:  
     * @returns:  
     * @Date: 2019/12/4 
    */
    String getRoute();

    HttpResponse fallbackResponse(Throwable cause);

    HttpResponse fallbackResponse();

}

这个 HostFallbackProvider 是我参考 RibbonRoutingFilter 提供的 FallbackProviderZuulFallbackProvider 实现的一个接口,只不过 FallbackProvider 返回的是 ClientHttpResponse ,而我这里返回的是 HttpResponse ,功能都是一致的

响应解码: DefaultHttpClientErrorDecoder

再看 DefaultHttpClientErrorDecoder ,这个是我默认实现的一套逻辑:针对4系列的异常不做处理,也不触发熔断和降级,5系列的异常我们就直接触发抛出一个自定义的异常,并且将response装在异常里面返回出去

/**
 * Created by liuwen on 2019/12/3
 * httpClient 响应错误处理接口,可以通过实现此方法配置httpClient发生错误(4xx/5xx)异常的错误解析,
 * 从而决定是否走熔断处理,也可以用于异常信息的传递
 *
 */
public interface HttpClientErrorDecoder {

    boolean hashError(HttpResponse httpResponse);


    void handlerError(HttpResponse httpResponse) throws Exception;

}




@Component
public class DefaultHttpClientErrorDecoder implements HttpClientErrorDecoder {

    @Override
    public boolean hashError(HttpResponse httpResponse) {
        int stateSeries = getStateSeries(httpResponse);
        return stateSeries == HttpStatus.Series.SERVER_ERROR.value() || stateSeries == HttpStatus.Series.CLIENT_ERROR.value();
    }

    private int getStateSeries(HttpResponse httpResponse) {
        return httpResponse.getStatusLine().getStatusCode() / 100;
    }

    @Override
    public void handlerError(HttpResponse httpResponse) throws Exception {
        int stateSeries = getStateSeries(httpResponse);
        //4系列的异常不走熔断
        if (stateSeries == HttpStatus.Series.SERVER_ERROR.value()) {
            throw new CircuitBreakerResponseException(httpResponse, "internal server error");
        }
    }
}

自定义的异常很简单:

@Data
public class CircuitBreakerResponseException extends RuntimeException {

  private HttpResponse httpResponse;

    public CircuitBreakerResponseException(HttpResponse response, String message) {
        super(message);
        this.httpResponse = response;
    }
}

请求处理Command: RestClientHystrixCommand

接下来就是核心了,我们的自定义的 RestClientHystrixCommand ,仔细看这个Command,在构造器里面初始化了父类的构造参数,和 RestClientRibbonCommand 类似,也是定义了使用 HystrixCommand 时的一些默认配置,以及引用了一些zuul的配置bean去初始化 HystrixCommand .

他的 run() 方法就是执行了http请求,然后对响应结果进行了decode,同时,由于我们使用的原生的 HttpClient ,所以,一定要关闭 HttpClient ,防止它占用太多的资源,导致连接一直无法释放,从而造成后续的请求由于没有连接使用而一直阻塞,导致系统无法使用,虽然在 SimpleHostHystrixRoutingFilter 里有一个定时器去定时去关闭连接,但是这种没有被使用的连接是无法关闭的,所以这里一定要在调用超时的情况下关闭连接

fallBack() ,这里的 fallBack() 就是使用了之前在 RestClientHystrixCommandFactory 里面传入的 HostFallbackProvider bean,然后将fallBack逻辑交给 HostFallbackProvider 提供的方法中处理,从而达到定制化的降级的目的

@CommonsLog
public class RestClientHystrixCommand extends HystrixCommand<HttpResponse> {

    private final HttpRequest httpRequest;
    private final HttpHost httpHost;
    private CloseableHttpClient httpclient;
    private HostFallbackProvider fallbackProvider;
    private HttpClientErrorDecoder errorDecoder;


    public RestClientHystrixCommand(String commandKey, CloseableHttpClient httpClient, HttpRequest httpRequest, HttpHost httpHost, HostFallbackProvider fallbackProvider, DefaultHttpClientErrorDecoder errorDecoder) {
        //初始化父类默认构造参数
        super(getSetter(commandKey));
        this.httpclient = httpClient;
        this.httpRequest = httpRequest;
        this.httpHost = httpHost;
        this.fallbackProvider = fallbackProvider;
        this.errorDecoder = errorDecoder;
    }


    /**
     * @Description: 使用信号量隔离
     * @Param:
     * @returns:
     * @Date: 2019/11/20
     */
    protected static HystrixCommand.Setter getSetter(String commandKey) {
        //使用信号量,并且信号量默认最大信号量是100(路由级别信号量)
        String name = ZuulConstants.ZUUL_EUREKA + commandKey + ".semaphore.maxSemaphores";
        DynamicIntProperty value = DynamicPropertyFactory.getInstance().getIntProperty(
                name, 100);
        HystrixCommandProperties.Setter setter = HystrixCommandProperties.Setter()
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                .withExecutionIsolationSemaphoreMaxConcurrentRequests(value.get());
        //所有hystrix的线程均在一个线程池 RestClientCommand 中
        return Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RestClientCommand"))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey))
                .andCommandPropertiesDefaults(setter);
    }

    @Override
    protected HttpResponse run() throws Exception {
        return forward();
    }

    protected HttpResponse forward() throws Exception {
        CloseableHttpResponse response = httpclient.execute(httpHost, httpRequest);

        if(isResponseTimedOut()){
            response.close();
        }

        //处理返回响应,触发熔断或者传递异常
        if (this.errorDecoder.hashError(response)) {
            this.errorDecoder.handlerError(response);
        }

        return response;
    }

    /**
     * @Description: 重写getFallback方法,
     * @Param:
     * @returns:
     * @Date: 2019/11/21
     */
    @Override
    protected HttpResponse getFallback() {
        log.error("====>some error was happened in this request,execute fallback method");

        Throwable throwable = super.getExecutionException();
        if (!Objects.isNull(fallbackProvider)) {
            if ((throwable == null ? super.getExecutionException() : throwable) == null) {
                return fallbackProvider.fallbackResponse();
            } else {
                return fallbackProvider.fallbackResponse(throwable);
            }
        }
        return super.getFallback();
    }
}

至此,我们的一套定制化的传统路由模式增加熔断的功能已经实现完毕,除了可以使用到之前的host的一套配置之外,也可以使用ribbon的一些关于断路器的配置,而且弥补了之前 RibbonRoutingFilter 的请求触发降级/熔断的缺陷,使用者可以定制化的针对不同的路由id进行定制化的开发自己的模块的解码和降级策略.

原文  https://segmentfault.com/a/1190000021214123
正文到此结束
Loading...