本文主要研究一下nacos的ServerListManager
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
@Component("serverListManager") public class ServerListManager { private static final int STABLE_PERIOD = 60 * 1000; @Autowired private SwitchDomain switchDomain; private List<ServerChangeListener> listeners = new ArrayList<>(); private List<Server> servers = new ArrayList<>(); private List<Server> healthyServers = new ArrayList<>(); private Map<String, List<Server>> distroConfig = new ConcurrentHashMap<>(); private Map<String, Long> distroBeats = new ConcurrentHashMap<>(16); private Set<String> liveSites = new HashSet<>(); private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE; private long lastHealthServerMillis = 0L; private boolean autoDisabledHealthCheck = false; private Synchronizer synchronizer = new ServerStatusSynchronizer(); public void listen(ServerChangeListener listener) { listeners.add(listener); } @PostConstruct public void init() { GlobalExecutor.registerServerListUpdater(new ServerListUpdater()); GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 5000); } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java
public class GlobalExecutor { public static final long HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L); public static final long LEADER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15L); public static final long RANDOM_MS = TimeUnit.SECONDS.toMillis(5L); public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L); private static final long NACOS_SERVER_LIST_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(5); private static final long PARTITION_DATA_TIMED_SYNC_INTERVAL = TimeUnit.SECONDS.toMillis(5); private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5); //...... public static void registerServerListUpdater(Runnable runnable) { executorService.scheduleAtFixedRate(runnable, 0, NACOS_SERVER_LIST_REFRESH_INTERVAL, TimeUnit.MILLISECONDS); } public static void registerServerStatusReporter(Runnable runnable, long delay) { SERVER_STATUS_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS); } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
public class ServerListUpdater implements Runnable { @Override public void run() { try { List<Server> refreshedServers = refreshServerList(); List<Server> oldServers = servers; if (CollectionUtils.isEmpty(refreshedServers)) { Loggers.RAFT.warn("refresh server list failed, ignore it."); return; } boolean changed = false; List<Server> newServers = (List<Server>) CollectionUtils.subtract(refreshedServers, oldServers); if (CollectionUtils.isNotEmpty(newServers)) { servers.addAll(newServers); changed = true; Loggers.RAFT.info("server list is updated, new: {} servers: {}", newServers.size(), newServers); } List<Server> deadServers = (List<Server>) CollectionUtils.subtract(oldServers, refreshedServers); if (CollectionUtils.isNotEmpty(deadServers)) { servers.removeAll(deadServers); changed = true; Loggers.RAFT.info("server list is updated, dead: {}, servers: {}", deadServers.size(), deadServers); } if (changed) { notifyListeners(); } } catch (Exception e) { Loggers.RAFT.info("error while updating server list.", e); } } } private List<Server> refreshServerList() { List<Server> result = new ArrayList<>(); if (STANDALONE_MODE) { Server server = new Server(); server.setIp(NetUtils.getLocalAddress()); server.setServePort(RunningConfig.getServerPort()); result.add(server); return result; } List<String> serverList = new ArrayList<>(); try { serverList = readClusterConf(); } catch (Exception e) { Loggers.SRV_LOG.warn("failed to get config: " + CLUSTER_CONF_FILE_PATH, e); } if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("SERVER-LIST from cluster.conf: {}", result); } //use system env if (CollectionUtils.isEmpty(serverList)) { serverList = SystemUtils.getIPsBySystemEnv(UtilsAndCommons.SELF_SERVICE_CLUSTER_ENV); if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("SERVER-LIST from system variable: {}", result); } } if (CollectionUtils.isNotEmpty(serverList)) { for (int i = 0; i < serverList.size(); i++) { String ip; int port; String server = serverList.get(i); if (server.contains(UtilsAndCommons.IP_PORT_SPLITER)) { ip = server.split(UtilsAndCommons.IP_PORT_SPLITER)[0]; port = Integer.parseInt(server.split(UtilsAndCommons.IP_PORT_SPLITER)[1]); } else { ip = server; port = RunningConfig.getServerPort(); } Server member = new Server(); member.setIp(ip); member.setServePort(port); result.add(member); } } return result; } private void notifyListeners() { GlobalExecutor.notifyServerListChange(new Runnable() { @Override public void run() { for (ServerChangeListener listener : listeners) { listener.onChangeServerList(servers); listener.onChangeHealthyServerList(healthyServers); } } }); }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
private class ServerStatusReporter implements Runnable { @Override public void run() { try { if (RunningConfig.getServerPort() <= 0) { return; } checkDistroHeartbeat(); int weight = Runtime.getRuntime().availableProcessors() / 2; if (weight <= 0) { weight = 1; } long curTime = System.currentTimeMillis(); String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "/r/n"; //send status to itself onReceiveServerStatus(status); List<Server> allServers = getServers(); if (!contains(NetUtils.localServer())) { Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", NetUtils.localServer(), allServers); return; } if (allServers.size() > 0 && !NetUtils.localServer().contains(UtilsAndCommons.LOCAL_HOST_IP)) { for (com.alibaba.nacos.naming.cluster.servers.Server server : allServers) { if (server.getKey().equals(NetUtils.localServer())) { continue; } Message msg = new Message(); msg.setData(status); synchronizer.send(server.getKey(), msg); } } } catch (Exception e) { Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e); } finally { GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis()); } } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
private void checkDistroHeartbeat() { Loggers.SRV_LOG.debug("check distro heartbeat."); List<Server> servers = distroConfig.get(LOCALHOST_SITE); if (CollectionUtils.isEmpty(servers)) { return; } List<Server> newHealthyList = new ArrayList<>(servers.size()); long now = System.currentTimeMillis(); for (Server s: servers) { Long lastBeat = distroBeats.get(s.getKey()); if (null == lastBeat) { continue; } s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis()); } //local site servers List<String> allLocalSiteSrvs = new ArrayList<>(); for (Server server : servers) { if (server.getKey().endsWith(":0")) { continue; } server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey())); for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) { if (!allLocalSiteSrvs.contains(server.getKey())) { allLocalSiteSrvs.add(server.getKey()); } if (server.isAlive() && !newHealthyList.contains(server)) { newHealthyList.add(server); } } } Collections.sort(newHealthyList); float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size(); if (autoDisabledHealthCheck && curRatio > switchDomain.getDistroThreshold() && System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) { Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " + "stable now, enable health check. current ratio: {}", curRatio); switchDomain.setHealthCheckEnabled(true); // we must set this variable, otherwise it will conflict with user's action autoDisabledHealthCheck = false; } if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) { // for every change disable healthy check for some while if (switchDomain.isHealthCheckEnabled()) { Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, " + "disable health check for {} ms from now on, old: {}, new: {}", STABLE_PERIOD, healthyServers, newHealthyList); switchDomain.setHealthCheckEnabled(false); autoDisabledHealthCheck = true; lastHealthServerMillis = System.currentTimeMillis(); } healthyServers = newHealthyList; notifyListeners(); } }