在之前的系列里面 Spring Cloud升级之路 - Hoxton - 5. 实现微服务调用重试 ,我们针对 OpenFeign 和 Spring Cloud Gateway 都设置了重试。
对于 OpenFeign:
对于 Spring Cloud Gateway:
现在,我们需要实现针对于 Spring Cloud Gateway 的非 Get 请求的任何IOException(除了SocketTimeOutException,这个是read time out 导致的),还有 redilience 断路器异常进行重试,Get因为请求并没有真正发出去。
目前在 Spring Cloud Gateway 的 RetryFilterFactory,无法实现针对 Get 和非 Get 对于不同的异常进行不同的重试:
org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory
public class RetryGatewayFilterFactory extends AbstractGatewayFilterFactory<RetryGatewayFilterFactory.RetryConfig> { /** * Retry iteration key.ServerWebExchange的某个Attribute的key * 这个Attribute用来在每次调用的时候,+1,看是否超过了重试次数 */ public static final String RETRY_ITERATION_KEY = "retry_iteration"; public RetryGatewayFilterFactory() { super(RetryConfig.class); } @Override public GatewayFilter apply(RetryConfig retryConfig) { //检验配置 retryConfig.validate(); Repeat<ServerWebExchange> statusCodeRepeat = null; //如果配置了可重试的HTTP响应状态码,则检查响应码是否可以重试 if (!retryConfig.getStatuses().isEmpty() || !retryConfig.getSeries().isEmpty()) { Predicate<RepeatContext<ServerWebExchange>> repeatPredicate = context -> { ServerWebExchange exchange = context.applicationContext(); //检查是否超过了重试次数 if (exceedsMaxIterations(exchange, retryConfig)) { return false; } //判断是否可以重试 HttpStatus statusCode = exchange.getResponse().getStatusCode(); boolean retryableStatusCode = retryConfig.getStatuses() .contains(statusCode); if (!retryableStatusCode && statusCode != null) { // try the series retryableStatusCode = retryConfig.getSeries().stream() .anyMatch(series -> statusCode.series().equals(series)); } final boolean finalRetryableStatusCode = retryableStatusCode; //判断是否是可以重试的HTTP方法 HttpMethod httpMethod = exchange.getRequest().getMethod(); boolean retryableMethod = retryConfig.getMethods().contains(httpMethod); //返回是否是可以重试的方法以及是否可以重试的HTTP响应状态码 return retryableMethod && finalRetryableStatusCode; }; //每次重试,都要重置路由,重新解析路由 statusCodeRepeat = Repeat.onlyIf(repeatPredicate) .doOnRepeat(context -> reset(context.applicationContext())); //设置Backoff BackoffConfig backoff = retryConfig.getBackoff(); if (backoff != null) { statusCodeRepeat = statusCodeRepeat.backoff(getBackoff(backoff)); } } // TODO: support timeout, backoff, jitter, etc... in Builder //判断异常是否可以重试 Retry<ServerWebExchange> exceptionRetry = null; if (!retryConfig.getExceptions().isEmpty()) { Predicate<RetryContext<ServerWebExchange>> retryContextPredicate = context -> { ServerWebExchange exchange = context.applicationContext(); if (exceedsMaxIterations(exchange, retryConfig)) { return false; } Throwable exception = context.exception(); for (Class<? extends Throwable> retryableClass : retryConfig .getExceptions()) { if (retryableClass.isInstance(exception) || (exception != null && retryableClass.isInstance(exception.getCause()))) { trace("exception or its cause is retryable %s, configured exceptions %s", () -> getExceptionNameWithCause(exception), retryConfig::getExceptions); HttpMethod httpMethod = exchange.getRequest().getMethod(); boolean retryableMethod = retryConfig.getMethods() .contains(httpMethod); trace("retryableMethod: %b, httpMethod %s, configured methods %s", () -> retryableMethod, () -> httpMethod, retryConfig::getMethods); return retryableMethod; } } trace("exception or its cause is not retryable %s, configured exceptions %s", () -> getExceptionNameWithCause(exception), retryConfig::getExceptions); return false; }; exceptionRetry = Retry.onlyIf(retryContextPredicate) .doOnRetry(context -> reset(context.applicationContext())) .retryMax(retryConfig.getRetries()); BackoffConfig backoff = retryConfig.getBackoff(); if (backoff != null) { exceptionRetry = exceptionRetry.backoff(getBackoff(backoff)); } } GatewayFilter gatewayFilter = apply(retryConfig.getRouteId(), statusCodeRepeat, exceptionRetry); return new GatewayFilter() { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return gatewayFilter.filter(exchange, chain); } @Override public String toString() { return filterToStringCreator(RetryGatewayFilterFactory.this) .append("retries", retryConfig.getRetries()) .append("series", retryConfig.getSeries()) .append("statuses", retryConfig.getStatuses()) .append("methods", retryConfig.getMethods()) .append("exceptions", retryConfig.getExceptions()).toString(); } }; } private String getExceptionNameWithCause(Throwable exception) { if (exception != null) { StringBuilder builder = new StringBuilder(exception.getClass().getName()); Throwable cause = exception.getCause(); if (cause != null) { builder.append("{cause=").append(cause.getClass().getName()).append("}"); } return builder.toString(); } else { return "null"; } } private Backoff getBackoff(BackoffConfig backoff) { return Backoff.exponential(backoff.firstBackoff, backoff.maxBackoff, backoff.factor, backoff.basedOnPreviousValue); } public boolean exceedsMaxIterations(ServerWebExchange exchange, RetryConfig retryConfig) { Integer iteration = exchange.getAttribute(RETRY_ITERATION_KEY); //是否超过了可重试次数 boolean exceeds = iteration != null && iteration >= retryConfig.getRetries(); return exceeds; } public void reset(ServerWebExchange exchange) { //这个方法主要是 Set<String> addedHeaders = exchange.getAttributeOrDefault( CLIENT_RESPONSE_HEADER_NAMES, Collections.emptySet()); addedHeaders .forEach(header -> exchange.getResponse().getHeaders().remove(header)); removeAlreadyRouted(exchange); } public GatewayFilter apply(String routeId, Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) { if (routeId != null && getPublisher() != null) { // send an event to enable caching getPublisher().publishEvent(new EnableBodyCachingEvent(this, routeId)); } return (exchange, chain) -> { trace("Entering retry-filter"); // chain.filter returns a Mono<Void> Publisher<Void> publisher = chain.filter(exchange) // .log("retry-filter", Level.INFO) .doOnSuccessOrError((aVoid, throwable) -> { int iteration = exchange .getAttributeOrDefault(RETRY_ITERATION_KEY, -1); int newIteration = iteration + 1; trace("setting new iteration in attr %d", () -> newIteration); exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration); }); if (retry != null) { // retryWhen returns a Mono<Void> // retry needs to go before repeat publisher = ((Mono<Void>) publisher) .retryWhen(retry.withApplicationContext(exchange)); } if (repeat != null) { // repeatWhen returns a Flux<Void> // so this needs to be last and the variable a Publisher<Void> publisher = ((Mono<Void>) publisher) .repeatWhen(repeat.withApplicationContext(exchange)); } return Mono.fromDirect(publisher); }; } @SuppressWarnings("unchecked") public static class RetryConfig implements HasRouteId { //路由id private String routeId; //重试次数,不包括调用的第一次,默认为3,也就是可能会调用4次 private int retries = 3; //针对哪些HTTP状态码重试,一个Series对应一组HttpStatus private List<Series> series = toList(Series.SERVER_ERROR); //针对哪些HTTP状态码重试,一个HttpStatus就是一个HTTP状态码 private List<HttpStatus> statuses = new ArrayList<>(); //针对哪些HTTP方法重试 private List<HttpMethod> methods = toList(HttpMethod.GET); //针对的哪些异常重试 private List<Class<? extends Throwable>> exceptions = toList(IOException.class, TimeoutException.class); //重试间隔策略 private BackoffConfig backoff; public void validate() { //重试次数必须大于10 Assert.isTrue(this.retries > 0, "retries must be greater than 0"); //可重试的series,可重试的状态码还有可重试的异常不能都为空,否则没有可以重试的场景了 Assert.isTrue( !this.series.isEmpty() || !this.statuses.isEmpty() || !this.exceptions.isEmpty(), "series, status and exceptions may not all be empty"); //重试的Http方法不能为空 Assert.notEmpty(this.methods, "methods may not be empty"); if (this.backoff != null) { this.backoff.validate(); } } //省略构造器,getter,setter还有一些工具方法 } public static class BackoffConfig { //第一次重试时间间隔 private Duration firstBackoff = Duration.ofMillis(5); //最大等待间隔 private Duration maxBackoff; //增长比例 private int factor = 2; //是否保留上一次请求的重试间隔时间,下次从这个时间间隔开始重试 private boolean basedOnPreviousValue = true; //省略构造器,getter,setter public void validate() { //第一次重试间隔不能为空 Assert.notNull(this.firstBackoff, "firstBackoff must be present"); } } } 复制代码
总结起来,流程简化如下:
配置的时候,HTTP 方法如果包含所有方法,那么没办法区分 GET 请求或者是 非 GET 请求;如果建立两个 Filter 一个拦截 GET 另一个拦截 非GET,那么他们共用的 Attribute 每次就会 +2,重试次数就不准确了。
所以,最后使用了这样一个不优雅的设计,就是 GET 和非 GET 使用不同的 RetryConfig,GET 的还是根据 application.properties
配置来,针对非 GET 请求,强制重试下面这些异常:
RetryGatewayFilter
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); //获取微服务名称 String serviceName = request.getHeaders().getFirst(CommonConstant.SERVICE_NAME); HttpMethod method = exchange.getRequest().getMethod(); //生成 GatewayFilter,保存到 gatewayFilterMap GatewayFilter gatewayFilter = gatewayFilterMap.computeIfAbsent(serviceName + ":" + method, k -> { Map<String, RetryConfig> retryConfigMap = apiGatewayRetryConfig.getRetry(); //通过微服务名称,获取重试配置 RetryConfig retryConfig = retryConfigMap.containsKey(serviceName) ? retryConfigMap.get(serviceName) : apiGatewayRetryConfig.getDefault(); //重试次数为0,则不重试 if (retryConfig.getRetries() == 0) { return null; } //针对非GET请求,强制限制重试并且只能重试下面的异常b if (!HttpMethod.GET.equals(method)) { RetryConfig newConfig = new RetryConfig(); BeanUtils.copyProperties(retryConfig, newConfig); //限制所有方法都可以重试,由于外层限制了不为GET,这里相当于不为GET的所有方法 newConfig.setMethods(HttpMethod.values()); newConfig.setSeries(); newConfig.setStatuses(); newConfig.setExceptions(//链接超时 io.netty.channel.ConnectTimeoutException.class, //No route to host java.net.ConnectException.class, //针对Resilience4j的异常 CallNotPermittedException.class); retryConfig = newConfig; } return this.apply(retryConfig); }); return gatewayFilter != null ? gatewayFilter.filter(exchange, chain) : chain.filter(exchange); }复制代码