本文主要研究一下nacos的HealthCheckCommon
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java
@Component public class HealthCheckCommon { @Autowired private DistroMapper distroMapper; @Autowired private SwitchDomain switchDomain; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private static LinkedBlockingDeque<HealthCheckResult> healthCheckResults = new LinkedBlockingDeque<>(1024 * 128); private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.taobao.health-check.notifier"); return thread; } }); public void init() { executorService.schedule(new Runnable() { @Override public void run() { List list = Arrays.asList(healthCheckResults.toArray()); healthCheckResults.clear(); List<Server> sameSiteServers = serverListManager.getServers(); if (sameSiteServers == null || sameSiteServers.size() <= 0) { return; } for (Server server : sameSiteServers) { if (server.getKey().equals(NetUtils.localServer())) { continue; } Map<String, String> params = new HashMap<>(10); params.put("result", JSON.toJSONString(list)); if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("[HEALTH-SYNC] server: {}, healthCheckResults: {}", server, JSON.toJSONString(list)); } HttpClient.HttpResult httpResult = HttpClient.httpPost("http://" + server.getKey() + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/healthCheckResult", null, params); if (httpResult.code != HttpURLConnection.HTTP_OK) { Loggers.EVT_LOG.warn("[HEALTH-CHECK-SYNC] failed to send result to {}, result: {}", server, JSON.toJSONString(list)); } } } }, 500, TimeUnit.MILLISECONDS); } //...... public void reEvaluateCheckRT(long checkRT, HealthCheckTask task, SwitchDomain.HealthParams params) { //...... } public void checkOK(Instance ip, HealthCheckTask task, String msg) { //...... } public void checkFail(Instance ip, HealthCheckTask task, String msg) { //...... } public void checkFailNow(Instance ip, HealthCheckTask task, String msg) { //...... } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java
public void reEvaluateCheckRT(long checkRT, HealthCheckTask task, SwitchDomain.HealthParams params) { task.setCheckRTLast(checkRT); if (checkRT > task.getCheckRTWorst()) { task.setCheckRTWorst(checkRT); } if (checkRT < task.getCheckRTBest()) { task.setCheckRTBest(checkRT); } checkRT = (long) ((params.getFactor() * task.getCheckRTNormalized()) + (1 - params.getFactor()) * checkRT); if (checkRT > params.getMax()) { checkRT = params.getMax(); } if (checkRT < params.getMin()) { checkRT = params.getMin(); } task.setCheckRTNormalized(checkRT); }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java
public void checkOK(Instance ip, HealthCheckTask task, String msg) { Cluster cluster = task.getCluster(); try { if (!ip.isHealthy() || !ip.isMockValid()) { if (ip.getOKCount().incrementAndGet() >= switchDomain.getCheckTimes()) { if (distroMapper.responsible(cluster, ip)) { ip.setHealthy(true); ip.setMockValid(true); Service service = cluster.getService(); service.setLastModifiedMillis(System.currentTimeMillis()); pushService.serviceChanged(service); addResult(new HealthCheckResult(service.getName(), ip)); Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg); } else { if (!ip.isMockValid()) { ip.setMockValid(true); Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg); } } } else { Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-ENABLED} pre-valid: {}:{}@{} in {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getOKCount(), msg); } } } catch (Throwable t) { Loggers.SRV_LOG.error("[CHECK-OK] error when close check task.", t); } ip.getFailCount().set(0); ip.setBeingChecked(false); }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java
public void checkFail(Instance ip, HealthCheckTask task, String msg) { Cluster cluster = task.getCluster(); try { if (ip.isHealthy() || ip.isMockValid()) { if (ip.getFailCount().incrementAndGet() >= switchDomain.getCheckTimes()) { if (distroMapper.responsible(cluster, ip)) { ip.setHealthy(false); ip.setMockValid(false); Service service = cluster.getService(); service.setLastModifiedMillis(System.currentTimeMillis()); addResult(new HealthCheckResult(service.getName(), ip)); pushService.serviceChanged(service); Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg); } else { Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg); } } else { Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-DISABLED} pre-invalid: {}:{}@{} in {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), ip.getFailCount(), msg); } } } catch (Throwable t) { Loggers.SRV_LOG.error("[CHECK-FAIL] error when close check task.", t); } ip.getOKCount().set(0); ip.setBeingChecked(false); }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java
public void checkFailNow(Instance ip, HealthCheckTask task, String msg) { Cluster cluster = task.getCluster(); try { if (ip.isHealthy() || ip.isMockValid()) { if (distroMapper.responsible(cluster, ip)) { ip.setHealthy(false); ip.setMockValid(false); Service service = cluster.getService(); service.setLastModifiedMillis(System.currentTimeMillis()); pushService.serviceChanged(service); addResult(new HealthCheckResult(service.getName(), ip)); Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg); } else { if (ip.isMockValid()) { ip.setMockValid(false); Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}", cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(), UtilsAndCommons.LOCALHOST_SITE, msg); } } } } catch (Throwable t) { Loggers.SRV_LOG.error("[CHECK-FAIL-NOW] error when close check task.", t); } ip.getOKCount().set(0); ip.setBeingChecked(false); }
HealthCheckCommon的init方法注册了一个延时任务,往其他server同步healthCheckResults;它主要提供了reEvaluateCheckRT、checkOK、checkFail、checkFailNow方法