本文主要研究一下nacos的DistroConsistencyServiceImpl
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java
public interface ConsistencyService { /** * Put a data related to a key to Nacos cluster * * @param key key of data, this key should be globally unique * @param value value of data * @throws NacosException * @see */ void put(String key, Record value) throws NacosException; /** * Remove a data from Nacos cluster * * @param key key of data * @throws NacosException */ void remove(String key) throws NacosException; /** * Get a data from Nacos cluster * * @param key key of data * @return data related to the key * @throws NacosException */ Datum get(String key) throws NacosException; /** * Listen for changes of a data * * @param key key of data * @param listener callback of data change * @throws NacosException */ void listen(String key, RecordListener listener) throws NacosException; /** * Cancel listening of a data * * @param key key of data * @param listener callback of data change * @throws NacosException */ void unlisten(String key, RecordListener listener) throws NacosException; /** * Tell the status of this consistency service * * @return true if available */ boolean isAvailable(); }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java
public interface EphemeralConsistencyService extends ConsistencyService { }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.distro.notifier"); return t; } }); @Autowired private DistroMapper distroMapper; @Autowired private DataStore dataStore; @Autowired private TaskDispatcher taskDispatcher; @Autowired private DataSyncer dataSyncer; @Autowired private Serializer serializer; @Autowired private ServerListManager serverListManager; @Autowired private SwitchDomain switchDomain; @Autowired private GlobalConfig globalConfig; private boolean initialized = false; public volatile Notifier notifier = new Notifier(); private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>(); private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16); @PostConstruct public void init() { GlobalExecutor.submit(new Runnable() { @Override public void run() { try { load(); } catch (Exception e) { Loggers.DISTRO.error("load data failed.", e); } } }); executor.submit(notifier); } public void load() throws Exception { if (SystemUtils.STANDALONE_MODE) { initialized = true; return; } // size = 1 means only myself in the list, we need at least one another server alive: while (serverListManager.getHealthyServers().size() <= 1) { Thread.sleep(1000L); Loggers.DISTRO.info("waiting server list init..."); } for (Server server : serverListManager.getHealthyServers()) { if (NetUtils.localServer().equals(server.getKey())) { continue; } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync from " + server); } // try sync data from remote server: if (syncAllDataFromRemote(server)) { initialized = true; return; } } } //...... public boolean syncAllDataFromRemote(Server server) { try { byte[] data = NamingProxy.getAllData(server.getKey()); processData(data); return true; } catch (Exception e) { Loggers.DISTRO.error("sync full data from " + server + " failed!", e); return false; } } public void processData(byte[] data) throws Exception { if (data.length > 0) { Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class); for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { dataStore.put(entry.getKey(), entry.getValue()); if (!listeners.containsKey(entry.getKey())) { // pretty sure the service not exist: if (switchDomain.isDefaultInstanceEphemeral()) { // create empty service Loggers.DISTRO.info("creating service {}", entry.getKey()); Service service = new Service(); String serviceName = KeyBuilder.getServiceName(entry.getKey()); String namespaceId = KeyBuilder.getNamespace(entry.getKey()); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0) .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } } } for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { // Should not happen: Loggers.DISTRO.warn("listener of {} not found.", entry.getKey()); continue; } try { for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e); continue; } // Update data store if listener executed successfully: dataStore.put(entry.getKey(), entry.getValue()); } } } //...... @Override public void put(String key, Record value) throws NacosException { onPut(key, value); taskDispatcher.addTask(key); } @Override public void remove(String key) throws NacosException { onRemove(key); listeners.remove(key); } @Override public Datum get(String key) throws NacosException { return dataStore.get(key); } //...... @Override public void listen(String key, RecordListener listener) throws NacosException { if (!listeners.containsKey(key)) { listeners.put(key, new CopyOnWriteArrayList<>()); } if (listeners.get(key).contains(listener)) { return; } listeners.get(key).add(listener); } @Override public void unlisten(String key, RecordListener listener) throws NacosException { if (!listeners.containsKey(key)) { return; } for (RecordListener recordListener : listeners.get(key)) { if (recordListener.equals(listener)) { listeners.get(key).remove(listener); break; } } } @Override public boolean isAvailable() { return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus()); } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024); public void addTask(String datumKey, ApplyAction action) { if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { return; } if (action == ApplyAction.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.add(Pair.with(datumKey, action)); } public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); while (true) { try { Pair pair = tasks.take(); if (pair == null) { continue; } String datumKey = (String) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { continue; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } }