本文主要研究一下nacos的ServerChangeListener
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java
public interface ServerChangeListener { /** * If member list changed, this method is invoked. * * @param servers servers after change */ void onChangeServerList(List<Server> servers); /** * If reachable member list changed, this method is invoked. * * @param healthyServer reachable servers after change */ void onChangeHealthyServerList(List<Server> healthyServer); }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java
@Component("distroMapper") public class DistroMapper implements ServerChangeListener { private List<String> healthyList = new ArrayList<>(); //...... @Override public void onChangeServerList(List<Server> latestMembers) { } @Override public void onChangeHealthyServerList(List<Server> latestReachableMembers) { List<String> newHealthyList = new ArrayList<>(); for (Server server : latestReachableMembers) { newHealthyList.add(server.getKey()); } healthyList = newHealthyList; } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java
@Component @DependsOn("serverListManager") public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware { @Autowired private ServerListManager serverListManager; private ApplicationContext applicationContext; private AtomicLong localTerm = new AtomicLong(0L); private RaftPeer leader = null; private Map<String, RaftPeer> peers = new HashMap<>(); private Set<String> sites = new HashSet<>(); private boolean ready = false; //...... @Override public void onChangeServerList(List<Server> latestMembers) { Map<String, RaftPeer> tmpPeers = new HashMap<>(8); for (Server member : latestMembers) { if (peers.containsKey(member.getKey())) { tmpPeers.put(member.getKey(), peers.get(member.getKey())); continue; } RaftPeer raftPeer = new RaftPeer(); raftPeer.ip = member.getKey(); // first time meet the local server: if (NetUtils.localServer().equals(member.getKey())) { raftPeer.term.set(localTerm.get()); } tmpPeers.put(member.getKey(), raftPeer); } // replace raft peer set: peers = tmpPeers; if (RunningConfig.getServerPort() > 0) { ready = true; } Loggers.RAFT.info("raft peers changed: " + latestMembers); } @Override public void onChangeHealthyServerList(List<Server> latestReachableMembers) { } //...... }
ServerChangeListener定义了onChangeServerList、onChangeHealthyServerList方法;DistroMapper实现了ServerChangeListener接口,其onChangeHealthyServerList会更新自己的healthyList;RaftPeerSet实现了ServerChangeListener接口,其onChangeServerList方法会更新peers及ready属性