本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component @DependsOn("nacosApplicationContext") public class ServiceManager implements RecordListener<Service> { /** * Map<namespace, Map<group::serviceName, Service>> */ private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer(); private final Lock lock = new ReentrantLock(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); @PostConstruct public void init() { UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS); UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor()); try { Loggers.SRV_LOG.info("listen for service meta change"); consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this); } catch (NacosException e) { Loggers.SRV_LOG.error("listen for service meta change failed!"); } } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
private class UpdatedServiceProcessor implements Runnable { //get changed service from other server asynchronously @Override public void run() { ServiceKey serviceKey = null; try { while (true) { try { serviceKey = toBeUpdatedServicesQueue.take(); } catch (Exception e) { Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque."); } if (serviceKey == null) { continue; } GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey)); } } catch (Exception e) { Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e); } } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
private class ServiceUpdater implements Runnable { String namespaceId; String serviceName; String serverIP; public ServiceUpdater(ServiceKey serviceKey) { this.namespaceId = serviceKey.getNamespaceId(); this.serviceName = serviceKey.getServiceName(); this.serverIP = serviceKey.getServerIP(); } @Override public void run() { try { updatedHealthStatus(namespaceId, serviceName, serverIP); } catch (Exception e) { Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName, serverIP, e); } } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component @DependsOn("nacosApplicationContext") public class ServiceManager implements RecordListener<Service> { /** * Map<namespace, Map<group::serviceName, Service>> */ private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer(); private final Lock lock = new ReentrantLock(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); //...... public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) { Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); JSONObject serviceJson = JSON.parseObject(msg.getData()); JSONArray ipList = serviceJson.getJSONArray("ips"); Map<String, String> ipsMap = new HashMap<>(ipList.size()); for (int i = 0; i < ipList.size(); i++) { String ip = ipList.getString(i); String[] strings = ip.split("_"); ipsMap.put(strings[0], strings[1]); } Service service = getService(namespaceId, serviceName); if (service == null) { return; } boolean changed = false; List<Instance> instances = service.allIPs(); for (Instance instance : instances) { boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr())); if (valid != instance.isHealthy()) { changed = true; instance.setHealthy(valid); Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}", serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(), instance.getClusterName()); } } if (changed) { pushService.serviceChanged(service); } StringBuilder stringBuilder = new StringBuilder(); List<Instance> allIps = service.allIPs(); for (Instance instance : allIps) { stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(","); } if (changed && Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(), service.getName(), stringBuilder.toString()); } } //...... }