这里是 SpringCloud Gateway
实践的第一篇,主要讲过滤器的相关实现。Spring-Cloud-Gateway 是以 WebFlux
为基础的响应式架构设计, 是异步非阻塞式的,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。
本篇将基于 spring-cloud-gateway 简介 基础环境进行改造。
Spring-Cloud-Gateway 基于过滤器实现,同 zuul 类似,有 pre 和 post 两种方式的 filter,分别处理 前置逻辑 和 后置逻辑 。客户端的请求先经过 pre 类型的 filter,然后将请求转发到具体的业务服务,收到业务服务的响应之后,再经过 post 类型的 filter 处理,最后返回响应到客户端。
过滤器执行流程如下, order 越大,优先级越低
接下来我们来验证下 filter
执行顺序。
这里创建 3 个过滤器,分别配置不同的优先级
@Slf4j public class AFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("AFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("AFilter后置逻辑"); })); } } @Slf4j public class BFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("BFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("BFilter后置逻辑"); })); } } @Slf4j public class CFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("CFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("CFilter后置逻辑"); })); } } @Configuration public class FilterConfig { @Bean @Order(-1) public GlobalFilter a() { return new AFilter(); } @Bean @Order(0) public GlobalFilter b() { return new BFilter(); } @Bean @Order(1) public GlobalFilter c() { return new CFilter(); } }
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1 curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
查看网关输出日志
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter前置逻辑 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter前置逻辑 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter前置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter后置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter后置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter后置逻辑
现在假设我们要统计某个服务的响应时间,我们可以在代码中
long beginTime = System.currentTimeMillis(); // do something... long elapsed = System.currentTimeMillis() - beginTime; log.info("elapsed: {}ms", elapsed);
每次都要这么写是不是很烦?Spring 告诉我们有个东西叫 AOP。但是我们是微服务啊,在每个服务里都写也很烦。这时候就该网关的过滤器登台表演了。
自定义过滤器需要实现 GatewayFilter
和 Ordered
。其中 GatewayFilter
中的这个方法就是用来实现你的自定义的逻辑的
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
而 Ordered
中的 int getOrder()
方法是来给过滤器设定优先级别的,值越大则优先级越低。
好了,让我们来撸代码吧.
/** * 此过滤器功能为计算请求完成时间 */ public class ElapsedFilter implements GatewayFilter, Ordered { private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin"; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms"); } }) ); } /* *过滤器存在优先级,order越大,优先级越低 */ @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } }
我们在请求刚刚到达时,往 ServerWebExchange
中放入了一个属性 elapsedTimeBegin
,属性值为当时的毫秒级时间戳。然后在请求执行结束后,又从中取出我们之前放进去的那个时间戳,与当前时间的差值即为该请求的耗时。因为这是与业务无关的日志所以将 Ordered
设为 Integer.MAX_VALUE
以降低优先级。
现在再来看我们之前的问题:怎么来区分是 “pre” 还是 “post” 呢?其实就是 chain.filter(exchange)
之前的就是 “pre” 部分,之后的也就是 then
里边的是 “post” 部分。
创建好 Filter 之后我们将它添加到我们的 Filter Chain 里边
@Configuration public class FilterConfig { /** * http://localhost:8100/filter/provider * @param builder * @return */ @Bean public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) { // @formatter:off // 可以对比application.yml中关于路由转发的配置 return builder.routes() .route(r -> r.path("/filter/**") .filters(f -> f.stripPrefix(1) .filter(new ElapsedFilter())) .uri("lb://idc-cloud-provider") .order(0) .id("filter") ) .build(); // @formatter:on } }
// AdaptCachedBodyGlobalFilter @Component public class LogFilter implements GlobalFilter, Ordered { private Logger log = LoggerFactory.getLogger(LogFilter.class); private final ObjectMapper objectMapper = new ObjectMapper(); private static final String START_TIME = "startTime"; private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders(); @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); // 请求路径 String path = request.getPath().pathWithinApplication().value(); // 请求schema: http/https String scheme = request.getURI().getScheme(); // 请求方法 HttpMethod method = request.getMethod(); // 路由服务地址 URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); // 请求头 HttpHeaders headers = request.getHeaders(); // 设置startTime exchange.getAttributes().put(START_TIME, System.currentTimeMillis()); // 获取请求地址 InetSocketAddress remoteAddress = request.getRemoteAddress(); MultiValueMap<String, String> formData = null; AccessRecord accessRecord = new AccessRecord(); accessRecord.setPath(path); accessRecord.setSchema(scheme); accessRecord.setMethod(method.name()); accessRecord.setTargetUri(targetUri.toString()); accessRecord.setRemoteAddress(remoteAddress.toString()); accessRecord.setHeaders(headers); if (method == HttpMethod.GET) { formData = request.getQueryParams(); accessRecord.setFormData(formData); writeAccessRecord(accessRecord); } if (method == HttpMethod.POST) { Mono<Void> voidMono = null; if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) { // JSON voidMono = readBody(exchange, chain, accessRecord); } if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) { // x-www-form-urlencoded voidMono = readFormData(exchange, chain, accessRecord); } if (voidMono != null) { return voidMono; } } return chain.filter(exchange); } private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) { return null; } private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) { return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); // 重写请求体,因为请求体数据只能被消费一次 ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } }; ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build(); return ServerRequest.create(mutatedExchange, messageReaders) .bodyToMono(String.class) .doOnNext(objectValue -> { accessRecord.setBody(objectValue); writeAccessRecord(accessRecord); }).then(chain.filter(mutatedExchange)); }); } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } /** * TODO 异步日志 * @param accessRecord */ private void writeAccessRecord(AccessRecord accessRecord) { log.info("/n/n start------------------------------------------------- /n " + "请求路径:{}/n " + "scheme:{}/n " + "请求方法:{}/n " + "目标服务:{}/n " + "请求头:{}/n " + "远程IP地址:{}/n " + "表单参数:{}/n " + "请求体:{}/n " + "end------------------------------------------------- /n ", accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody()); } }
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1 curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
输出结果
start------------------------------------------------- 请求路径:/provider1 scheme:http 请求方法:POST 目标服务:http://192.168.124.5:2001/provider1 请求头:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"] 远程IP地址:/192.168.124.5:49969 表单参数:null 请求体:{"name":"admin"} end-------------------------------------------------
接下来,我们来配置日志,方便日志系统提取日志。SpringBoot 默认的日志为 logback。
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" /> <appender name="Console" class="ch.qos.logback.core.ConsoleAppender"> <layout class="ch.qos.logback.classic.PatternLayout"> <Pattern> %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable </Pattern> </layout> </appender> <appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOGS}/spring-boot-logger.log</file> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <Pattern>%d %p %C{1.} [%t] %m%n</Pattern> </encoder> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- rollover daily and when the file reaches 10 MegaBytes --> <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log </fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>10MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> </rollingPolicy> </appender> <!-- LOG everything at INFO level --> <root level="info"> <!--<appender-ref ref="RollingFile" />--> <appender-ref ref="Console" /> </root> <!-- LOG "cn.idea360*" at TRACE level additivity:是否向上级loger传递打印信息。默认是true--> <logger name="cn.idea360.gateway" level="info" additivity="false"> <appender-ref ref="RollingFile" /> <appender-ref ref="Console" /> </logger> </configuration>
这样 console 和日志目录下就都有日志了。
如果你看过静态路由的配置,你应该对如下配置有印象。
filters: - StripPrefix=1 - AddResponseHeader=X-Response-Default-Foo, Default-Bar
StripPrefix
、 AddResponseHeader
这两个实际上是两个过滤器工厂(GatewayFilterFactory),用这种配置的方式更灵活方便。
我们就将之前的那个 ElapsedFilter
改造一下,让它能接收一个 boolean
类型的参数,来决定是否将请求参数也打印出来。
public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> { private static final Log log = LogFactory.getLog(GatewayFilter.class); private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin"; private static final String KEY = "withParams"; public List<String> shortcutFieldOrder() { return Arrays.asList(KEY); } public ElapsedGatewayFilterFactory() { super(Config.class); } public GatewayFilter apply(Config config) { return (exchange, chain) -> { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath()) .append(": ") .append(System.currentTimeMillis() - startTime) .append("ms"); if (config.isWithParams()) { sb.append(" params:").append(exchange.getRequest().getQueryParams()); } log.info(sb.toString()); } }) ); }; } public static class Config { private boolean withParams; public boolean isWithParams() { return withParams; } public void setWithParams(boolean withParams) { this.withParams = withParams; } } }
过滤器工厂的顶级接口是 GatewayFilterFactory
,我们可以直接继承它的两个抽象类来简化开发 AbstractGatewayFilterFactory
和 AbstractNameValueGatewayFilterFactory
,这两个抽象类的区别就是前者接收一个参数(像 StripPrefix
和我们创建的这种),后者接收两个参数(像 AddResponseHeader
)。
GatewayFilter apply(Config config)
方法内部实际上是创建了一个 GatewayFilter
的匿名类,具体实现和之前的几乎一样,就不解释了。
静态内部类 Config
就是为了接收那个 boolean
类型的参数服务的,里边的变量名可以随意写,但是要重写 List shortcutFieldOrder()
这个方法。
这里注意一下,一定要调用一下父类的构造器把 Config
类型传过去,否则会报 ClassCastException
public ElapsedGatewayFilterFactory() { super(Config.class); }
工厂类我们有了,再把它注册到 Spring 当中
@Bean public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() { return new ElapsedGatewayFilterFactory(); }
然后添加配置(主要改动在 default-filters
配置)
server: port: 2000 spring: application: name: idc-gateway redis: host: localhost port: 6379 timeout: 6000ms # 连接超时时长(毫秒) jedis: pool: max-active: 1000 # 连接池最大连接数(使用负值表示没有限制) max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制) max-idle: 10 # 连接池中的最大空闲连接 min-idle: 5 # 连接池中的最小空闲连接 cloud: consul: host: localhost port: 8500 gateway: discovery: locator: enabled: true # 修改在这里。gateway可以通过开启以下配置来打开根据服务的serviceId来匹配路由,默认是大写 default-filters: - Elapsed=true routes: - id: provider # 路由 ID,保持唯一 uri: lb://idc-provider1 # uri指目标服务地址,lb代表从注册中心获取服务 predicates: # 路由条件。Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非) - Path=/p/** filters: - StripPrefix=1 # 过滤器StripPrefix,作用是去掉请求路径的最前面n个部分截取掉。StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view
本文到此结束。关于 Webflux
的学习刚入门,觉得可以像 Rxjava
那样在 onNext
中拿到异步数据,然而在 post
获取 body 中没生效。经测试可知 getBody
获得的数据输出为 null,而自己通过 Flux.create
创建的数据可以在订阅者中获取到。此处还有待研究,希望抛砖引玉,大家有研究出来的不吝赐教。同时,希望大家关注公众号【当我遇上你】。