回到第一章节讲到的几个问题 :
第一个问题是因为:springboot-tomcat在系统停止时会粗暴的 shutdownNow
线程池。
第二和第三个问题的原因是一样的,因为ribbon缓存和eureka缓存导致:
所以解决问题的核心在于,在服务停止之前(严格按照顺序) :
在服务启动且eureka完成注册之后(严格按照顺序):
从《项目优雅重启(三):eureka-client》可以看到, DiscoveryClient
有个 shutdown
方法,并且这个方法是 public
,而 shutdown
方法是在接口 EurekaClient
定义的,所以我们只要 EurekaClient.shutdown
即可。
springboot的 EurekaClientAutoConfiguration
类里会生产 EurekaClient-Bean
,所以只需要注入 EurekaClient
即可。
@Autowired private EurekaClient client;
这里需要特别注意的是: eureka-client
会调用 eureka-server
接口来注销信息,假如网络出了问题,或者 eureka-client
出了问题了,可能会导致请求非常慢,所以我们需要加一层保障,保证在指定的时间内还没完成注销操作,就强行中断并继续下一步。
private void unregisterEurekaData() { Thread shutDownEurakaThread = new Thread(() -> { try { getClient().shutdown(); logger.info(" ============================== shutdown local eureka data success!"); } catch (Exception e) { logger.error(" ============================== shutdown local eureka data error!", e); } }); try { shutDownEurakaThread.start(); shutDownEurakaThread.join(unregisterEurekaShutdownWaitSeconds * 1000); // 不能无限制等待 if (shutDownEurakaThread.isAlive()) { logger.error(" ============================== shutdown local eureka doesn't compelete in allow time!"); shutDownEurakaThread.interrupt(); } } catch (Exception e) { logger.error(" ============================== shutdown local eureka data error!", e); } }
可以通过gateway提供接口,client调用接口的方式来刷新缓存,gateway接口内容后面会讲到,client这边需要做的是:
为了加快速度,用了线程池来并行调用,同样的如果超时未完成任务,就强行停止任务。
protected void refreshGatewayEurekaData() { if (!ifRegisterWithEureka()) { logger.error(" ============================== not registerWithEureka to refreshGatewayEurekaData!"); return; } try { // gatewayServiceName是gateway在eureka注册的服务名 Application application = client.getApplication(gatewayServiceName); if (null == application) { return; } List<InstanceInfo> instanceInfos = application.getInstances(); if (CollectionUtils.isEmpty(instanceInfos)) { return; } SimpleClientHttpRequestFactory clientHttpRequestFactory = new SimpleClientHttpRequestFactory(); clientHttpRequestFactory.setConnectTimeout(3000); clientHttpRequestFactory.setReadTimeout(3000); RestTemplate restTemplate = new RestTemplate(clientHttpRequestFactory); int poolSize = instanceInfos.size(); if(poolSize > maxPoolSize) { poolSize = maxPoolSize; } ExecutorService executorService = Executors.newFixedThreadPool(instanceInfos.size()); instanceInfos.forEach(i -> executorService.submit(() -> { String url = getRefreshGatewayUrl(i); try { ResponseEntity<String> response = restTemplate.getForEntity(url, String.class); if (!HttpStatus.OK.equals(response.getStatusCode())) { logger.error(" ============================== refresh remote gateway[{}] eureka data error:[{}]!", url, response.getBody()); } else { logger.info(" ============================== refresh remote gateway[{}] eureka data success!", url); } } catch (Exception e) { logger.error(" ============================== refresh remote gateway[{}] eureka data error!", url, e); } }) ); executorService.shutdown(); if (!executorService.awaitTermination(refreshGatewauEurekaShutdownWaitSeconds, TimeUnit.SECONDS)) { logger.error(" ============================== refresh remote gateway eureka data thread pool did not shut down gracefully within {} seconds. Proceeding with forceful shutdown", refreshGatewauEurekaShutdownWaitSeconds); if (!executorService.isShutdown()) { executorService.shutdownNow(); } } } catch (Exception e) { logger.error(" ============================== refresh remote gateway eureka data!", e); } } protected String getRefreshGatewayUrl(InstanceInfo instanceInfo) { return instanceInfo.getHomePageUrl() + gatewayRefreshEurekaUrl + (gatewayRefreshEurekaUrl.indexOf('?') == -1 ? "?" : "&") + "serviceId=" + applicationName.toUpperCase(); }
从《 springcloud项目优雅重启(五):tomcat关闭流程 》可以看到,tomcat的关闭是个很复杂很长的流程,并且tomcat 暴力shutdown
是因为对线程池用了 shutdownNow
,我们只是想在项目停止时,**在“上两步完成之后-tomcat停止之前”让tomcat不再接收请求。
ContextClosedEvent
事件,在 onClose
之前做我们想要做的事。 TomcatContextCustomizer
可以获取到 Connector
。 Connector
可以获取到线程池。 获取到线程池之后,由于前面的步骤正常情况下已经保证不会再接收到请求,这时候只需要等线程池内所有的请求都处理之后,再让流程继续往下走。
@Configuration @ConditionalOnProperty(name = "server.container.custom-config", havingValue = "true", matchIfMissing = true) @AutoConfigureAfter(EurekaClientAutoConfiguration.class) public class ContainerConfiguration { @Bean public ServletWebServerFactory servletContainer(@Autowired GracefulShutdown gracefulShutdown) { // springboot是在ServletWebServerFactoryConfiguration类里生成TomcatServletWebServerFactory,方式也是直接new一个对象。 // 我们可以像这样自己new(ServletWebServerFactoryConfiguration里用了ConditionalOnMissingBean),也可以注入springboot生成。 TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory(); tomcat.addConnectorCustomizers(gracefulShutdown); return tomcat; } } public class DefaultGracefulShutdown implements TomcatConnectorCustomizer, ApplicationListener<ContextClosedEvent> { @Override public void customize(Connector connector) { this.connector = connector; } @Override public void onApplicationEvent(ContextClosedEvent event) { shutdownTomcatConnector(); } private void shutdownTomcatConnector() { try { this.connector.pause(); logger.info(" ============================== after Tomcat connector pause!"); Executor executor = this.connector.getProtocolHandler().getExecutor(); if (executor instanceof ThreadPoolExecutor) { logger.info(" ============================== Tomcat thread pool begin to shut down gracefully!"); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; threadPoolExecutor.shutdown(); if (!threadPoolExecutor.awaitTermination(tomcatShutdownWaitSeconds, TimeUnit.SECONDS)) { logger.error(" ============================== Tomcat thread pool did not shut down gracefully within {} seconds. Proceeding with forceful shutdown", tomcatShutdownWaitSeconds); if (!threadPoolExecutor.isShutdown()) { threadPoolExecutor.shutdownNow(); } } } } catch (Exception e) { logger.error(" ============================== shutdown Tomcat connector error!", e); } } }
4. 结合
以上三步结合起来需要考虑几个细节:
eureka-server
接收到请求之后,是异步传播给集群其他节点。所以我们要等 eureka-server
都刷新了数据之后,再去让 gateway
刷新,否则 gateway
可能从一个还未更新数据的 server
节点取到未刷新的数据。但是我们并不知道 server
什么时候会刷新完,所以我们只能给个我们能容忍的等待时间。 connector.pause()
,会导致调用失败。整个流程如下: public void onApplicationEvent(ContextClosedEvent event) { logger.info(" ============================== begin gracefully shutdown service!"); // 本地调试时可能会关闭eureka或者不注册eureka if (ifRegister()) { unregisterEurekaData(); try { // 等待全部"eureka-server"刷新数据 TimeUnit.SECONDS.sleep(3); } catch (InterruptedException ignore) { } refreshGatewayEurekaData(); // 睡眠3秒,再停止tomcat接收新服务,如果有个请求在client.shutdown()之前获取到了服务节点,但是还没调用服务,这时候如果connector.pause(),会导致调用失败 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException ignore) { } } shutdownTomcatConnector(); logger.info(" ============================== finish gracefully shutdown service!"); }
启动要做到事是:eureka-client向eureka-server注册成功之后,调用网关接口刷新缓存。从eureka-client源码里面发现,client在注册成功之后,并没有 发送事件(public event)
,也就是无法知道什么时候注册成功。那我们就换个思路,在项目启动完成之后,我们自己调用注册方法,这样的结果就是,eureka-client会向eureka-server注册两次(自己注册一次,原先的启动流程注册一次),但是重复的注册并不会有什么影响。
public class DefaultStartRegister implements ApplicationListener<ApplicationReadyEvent> { @Override public void onApplicationEvent(ApplicationReadyEvent event) { if (ifRegister()) { register(); } } public void register() { executorService.submit(() -> { try { EurekaClient client = getClient(); if (InstanceInfo.InstanceStatus.UP.equals(client.getApplicationInfoManager().getInfo().getStatus())) { Method method = ReflectionUtils.findMethod(DiscoveryClient.class, "register"); // 非public方法 method.setAccessible(true); // springcloud下的EurekaClient实际上是个代理对象 Object target = AopUtils.isAopProxy(client) ? ProxyUtils.getTargetObject(client) : client; AtomicReference<Boolean> registerResult = new AtomicReference<>(false); Thread registerEurakaThread = new Thread(() -> { boolean ret = (boolean) ReflectionUtils.invokeMethod(method, target); registerResult.set(ret); logger.info(" ============================== invoke EurekaClient.register method to registerWithEureka result: {}!", ret); }); try { registerEurakaThread.start(); // 不能无限制等待 registerEurakaThread.join(registerEurekaShutdownWaitSeconds * 1000); if (registerEurakaThread.isAlive()) { logger.error(" ============================== register local eureka doesn't compelete in allow time!"); registerEurakaThread.interrupt(); } } catch (Exception e) { logger.error(" ============================== register local eureka data error!", e); } try { // 等待全部"eureka-server"刷新数据 TimeUnit.SECONDS.sleep(3); } catch (InterruptedException ignore) { } if (registerResult.get()) { refreshGatewayEurekaData(); } } } catch (Exception e) { logger.error("register eureka after start error!", e); } }); } }
DiscoveryClient.register
不是 public
,所以使用了反射的方式:用反射的优点是简单明了;缺点是如果eureka版本升级,可能就会变得不兼容/不可用(一般第三方组件升级时会尽量“保留/兼容” public
的方法, 非public
的方法就无法保证了),但是由于现在 eureka
已经停止维护了,所以也就没有这个问题了。 eureka-server
广播完成之后再调用gateway接口。 eureka-server的三级缓存中, registry
和 readWriteCacheMap
都是实时更新,而 readOnlyCacheMap
是通过定时任务定时从 readWriteCacheMap
取数据更新。
readWriteCacheMap
,让它也实时更新,那就变得跟 readWriteCacheMap
作用差不多了(从细节上讲, readWriteCacheMap
用的是 guava-cache
,还是会做一些额外的事情,比如计数统计、清理)。 registry
、 readWriteCacheMap
和 readOnlyCacheMap
数据都是存储在JVM内存里,对于绝大部分的系统来说两级缓存性能已经足够了。 所以eureka-server端,我们只需要关闭 readOnlyCacheMap
即可: eureka.server.use-read-only-response-cache: false
。虽然性能上会有下降,但是这点儿性能差别,对于大部分系统来说是没有影响的,除非超高并发的系统。
gateway需要提供一个接口给client在停机和启动时调用,接口做两件事:
eureka-client
缓存。 ribbon
重新从 eureka-client
里获取缓存。 eureka-client
缓存 通过源码可以看到, DiscoveryClient
里有个 refreshRegistry
方法会刷新缓存,但是这个方法不是 public
,同样需要反射调用。
ribbon
缓存 由于不同的服务分别有不同的 LoadBalancer
对象,需要先获取到服务对应的 LoadBalancer
,那就需要知道当前要刷新的是哪个服务,所以接口要有个参数,参数是服务在 eureka-server
上注册的服务名,要特别注意的是,eureka服务名默认是大写,所以传过来的服务名参数值也需要是大写。
同样的通过源码可以找到 RibbonLoadBalancerClient.DynamicServerListLoadBalancer.updateListOfServers
方法会刷新 ribbon
缓存,这里之所以选择从 RibbonLoadBalancerClient
作为入口,是因为springboot有将 RibbonLoadBalancerClient
暴露成 Bean
。
public class GatewayApplication { @Autowired private EurekaClient eurekaClient; @Autowired private LoadBalancerClient loadBalancerClient; @Bean public RouterFunction<ServerResponse> routerFunction() { return RouterFunctions.route(RequestPredicates.path("/eureka/refresh"), request -> { try { log.info("receive eureka refresh, eureka current service count {}!", eurekaClient.getApplications().size()); Method method = ReflectionUtils.findMethod(DiscoveryClient.class, "refreshRegistry"); // 非public方法 method.setAccessible(true); // springcloud下的EurekaClient实际上是个代理对象 Object target = AopUtils.isAopProxy(eurekaClient) ? ProxyUtils.getTargetObject(eurekaClient) : eurekaClient; ReflectionUtils.invokeMethod(method, target); log.info("finish eureka refresh, eureka current service count {}!", eurekaClient.getApplications().size()); request.queryParam("serviceId").filter(StringUtils::isNotBlank).ifPresent(serviceId -> { RibbonLoadBalancerClient ribbonLoadBalancerClient = (RibbonLoadBalancerClient) loadBalancerClient; Method ribbonMethod = ReflectionUtils.findMethod(RibbonLoadBalancerClient.class, "getLoadBalancer", String.class); ribbonMethod.setAccessible(true); // 非public方法 Object ribbonTarget = AopUtils.isAopProxy(ribbonLoadBalancerClient) ? ProxyUtils.getTargetObject(ribbonLoadBalancerClient) : ribbonLoadBalancerClient; DynamicServerListLoadBalancer dynamicServerListLoadBalancer = (DynamicServerListLoadBalancer) ReflectionUtils.invokeMethod(ribbonMethod, ribbonTarget, serviceId); log.info("begin refresh ribbon, ribbon current service[{}] count {}!", serviceId, dynamicServerListLoadBalancer.getAllServers().size()); // 也可以调用Ribbon的BaseLoadbalancer.markServerDown方法来清理Ribbon数据 dynamicServerListLoadBalancer.updateListOfServers(); log.info("finish refresh ribbon, ribbon current service[{}] count {}!", serviceId, dynamicServerListLoadBalancer.getAllServers().size()); }); return ok().body(Mono.just("success"), String.class); } catch (Exception e) { log.info("refresh eureka error!", e); return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Mono.just(e.getClass().getName() + ":" + e.getMessage()), String.class); } }); } }
RibbonLoadBalancerClient.getLoadBalancer
返回的是接口 ILoadBalancer
,由于 springboot-ribbon
默认是 DynamicServerListLoadBalancer
,所以直接类型强转,但是如果有哪个项目配置了自定义规则,返回的就不一定是 DynamicServerListLoadBalancer
(我没试过,所以也不确定)。 BaseLoadbalancer.markServerDown(Server server)
方法来让指定的服务节点失效,这个方法是 public
方法。我在这里采用了比较简单粗暴的 updateListOfServers
直接刷新整个服务。 jenkins流程中,除了 重启脚本
之外,还需要做两件事:
服务器要分两批启动,确保第一批有启动成功(可以提供服务)的节点之后,才能启动第二批,所以要有检查启动成功的机制。
backup
文件夹,如果没有则创建。 jar
文件移动到 backup
目录中,并在文件后面加上时间戳。 #!/bin/bash service_name=$1 if [ -z "$service_name" ] then echo "usage: ./backup-service.sh xxxx" exit 1 fi base_dir="/data/app/tscm/$service_name" backup_dir="$base_dir/backup" if [ ! -d "$backup_dir" ]; then mkdir -m 775 -p $backup_dir fi cd $base_dir time=$(date "+%Y%m%d%H%M%S") for var in ${service_name}*.jar; do mv "$var" "backup/${var%.jar}_$time.jar"; done exit 0
sh backup-service.sh 项目名
,其中打包的 jar
文件名称前缀文件项目名,例如项目为 my-test
,则打包的文件必须为 my-test{这里可以有其他字符}.jar
。 /data/app/tscm/
目录改为自己服务器中实际的目录。 fail-backup
目录中,将 backup
目录中最新的文件(即刚备份的文件)移动回来,重新启动。 #!/bin/bash service_name=$1 env=$2 check_url=$3 if [ -z "$service_name" ] then echo "service_name required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor" exit 1 fi if [ -z "$env" ] then echo "env required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor" exit 1 fi if [ -z "$check_url" ] then echo "check_url required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor" exit 1 fi base_dir="/data/app/tscm/$service_name" fail_dir="$base_dir/fail-backup" result=0 count=0 while [ $result -ne 200 -a $count -lt 18 ] # 根据接口返回的HTTP 状态码是否200来判断,循环18次 do sleep 10 # 睡眠10秒,也就是给项目最大启动时间为18 * 10 = 180秒 = 3分钟 let count+=1 result=$(curl -sIL -w "%{http_code}/n" -o /dev/null ${check_url}) echo $count" times curl result: "$result done if [ $result -eq 200 ]; then exit 0 else if [ ! -d "$fail_dir" ]; then mkdir -m 775 -p $fail_dir fi cd $base_dir time=$(date "+%Y%m%d%H%M%S") for var in ${service_name}*.jar; do mv "$var" "fail-backup/${var%.jar}_$time.jar"; done cd $base_dir/backup filename=`ls -t |head -n1|awk '{print $0}'` echo "失败回滚文件:$filename" mv "$filename" `echo "../$filename" |awk -F '_' '{print $1 ".jar"}'` cd ../.. sh run-service.sh $service_name $env # 重新启动 exit 1 fi
sh check-rollback-service.sh 项目名 spring环境 项目节点检查地址
,例如: sh check-rollback-service.sh mytest dev http://localhost:6101/base/monitor
。 /data/app/tscm/
目录改为自己服务器中实际的目录。 run-service.sh
为 项目重启
脚本,做两件事:
kill jar
前面的处理方案是刷新网关的的缓存,但是项目组内有部分项目用了 feign
,导致方案对这些项目无效。
feign
跟 gateway
一样,也是经过 robbin
和 eureka-client
两层处理,但是项目重启时,不可能像通知网关那样 通过接口同步
通知所有节点刷新本地缓存。
可以用消息通知的方式通知其他项目,但是这样有两个问题:
MQ
/ Redis
等可以接收通知的中间件。 睡眠等待
,来尽量保证其他项目已完成刷新。 鉴于通知的方式比较复杂,那是不是可以让 feign
走网关请求,而不是直接点对点请求。
通过API可以看到 FeignClient
配置里有 url
和 path
两个参数,当指定 url
参数时, feign
就不会再走 robbin
获取服务节点,而是直接发起请求,所以只需要做很小的调整。
调整前:
@FeignClient(value = "tscm-service-purchase-facader")
调整后:
@FeignClient(value = "tscm-service-purchase-facader", url = "${gateway.host}", path="/tscm-service-purchase-facader")
或
@FeignClient(value = "tscm-service-purchase-facader", url = "${gateway.host}/tscm-service-purchase-facader")