本文主要研究一下nacos的DistroMapper
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java
public interface ServerChangeListener { /** * If member list changed, this method is invoked. * * @param servers servers after change */ void onChangeServerList(List<Server> servers); /** * If reachable member list changed, this method is invoked. * * @param healthyServer reachable servers after change */ void onChangeHealthyServerList(List<Server> healthyServer); }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java
@Component("distroMapper") public class DistroMapper implements ServerChangeListener { private List<String> healthyList = new ArrayList<>(); public List<String> getHealthyList() { return healthyList; } @Autowired private SwitchDomain switchDomain; @Autowired private ServerListManager serverListManager; /** * init server list */ @PostConstruct public void init() { serverListManager.listen(this); } public boolean responsible(Cluster cluster, Instance instance) { return switchDomain.isHealthCheckEnabled(cluster.getServiceName()) && !cluster.getHealthCheckTask().isCancelled() && responsible(cluster.getServiceName()) && cluster.contains(instance); } public boolean responsible(String serviceName) { if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) { return true; } if (CollectionUtils.isEmpty(healthyList)) { // means distro config is not ready yet return false; } int index = healthyList.indexOf(NetUtils.localServer()); int lastIndex = healthyList.lastIndexOf(NetUtils.localServer()); if (lastIndex < 0 || index < 0) { return true; } int target = distroHash(serviceName) % healthyList.size(); return target >= index && target <= lastIndex; } public String mapSrv(String serviceName) { if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) { return NetUtils.localServer(); } try { return healthyList.get(distroHash(serviceName) % healthyList.size()); } catch (Exception e) { Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e); return NetUtils.localServer(); } } public int distroHash(String serviceName) { return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE); } @Override public void onChangeServerList(List<Server> latestMembers) { } @Override public void onChangeHealthyServerList(List<Server> latestReachableMembers) { List<String> newHealthyList = new ArrayList<>(); for (Server server : latestReachableMembers) { newHealthyList.add(server.getKey()); } healthyList = newHealthyList; } }
ServerChangeListener定义了onChangeServerList、onChangeHealthyServerList方法;DistroMapper实现了ServerChangeListener接口,其onChangeHealthyServerList方法会更新healthyList;DistroMapper还提供了responsible方法及mapSrv方法