本文主要研究一下skywalking的RemoteClientManager
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
public class RemoteClientManager implements Service { private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class); private final ModuleDefineHolder moduleDefineHolder; private ClusterNodesQuery clusterNodesQuery; private volatile List<RemoteClient> usingClients; private GaugeMetrics gauge; private int remoteTimeout; /** * Initial the manager for all remote communication clients. * * @param moduleDefineHolder for looking up other modules * @param remoteTimeout for cluster internal communication, in second unit. */ public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) { this.moduleDefineHolder = moduleDefineHolder; this.usingClients = ImmutableList.of(); this.remoteTimeout = remoteTimeout; } public void start() { Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS); } /** * Query OAP server list from the cluster module and create a new connection for the new node. Make the OAP server * orderly because of each of the server will send stream data to each other by hash code. */ void refresh() { if (gauge == null) { gauge = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class) .createGauge("cluster_size", "Cluster size of current oap node", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); } try { if (Objects.isNull(clusterNodesQuery)) { synchronized (RemoteClientManager.class) { if (Objects.isNull(clusterNodesQuery)) { this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class); } } } if (logger.isDebugEnabled()) { logger.debug("Refresh remote nodes collection."); } List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes(); instanceList = distinct(instanceList); Collections.sort(instanceList); gauge.setValue(instanceList.size()); if (logger.isDebugEnabled()) { instanceList.forEach(instance -> logger.debug("Cluster instance: {}", instance.toString())); } if (!compare(instanceList)) { if (logger.isDebugEnabled()) { logger.debug("ReBuilding remote clients."); } reBuildRemoteClients(instanceList); } printRemoteClientList(); } catch (Throwable t) { logger.error(t.getMessage(), t); } } //...... }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
public class RemoteClientManager implements Service { //...... private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) { final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream() .collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close))); final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream() .collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create))); final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet()); unChangeAddresses.stream() .filter(remoteClientCollection::containsKey) .forEach(unChangeAddress -> remoteClientCollection.get(unChangeAddress).setAction(Action.Unchanged)); // make the latestRemoteClients including the new clients only unChangeAddresses.forEach(latestRemoteClients::remove); remoteClientCollection.putAll(latestRemoteClients); final List<RemoteClient> newRemoteClients = new LinkedList<>(); remoteClientCollection.forEach((address, clientAction) -> { switch (clientAction.getAction()) { case Unchanged: newRemoteClients.add(clientAction.getRemoteClient()); break; case Create: if (address.isSelf()) { RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address); newRemoteClients.add(client); } else { RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout); client.connect(); newRemoteClients.add(client); } break; } }); //for stable ordering for rolling selector Collections.sort(newRemoteClients); this.usingClients = ImmutableList.copyOf(newRemoteClients); remoteClientCollection.values() .stream() .filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close)) .forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close()); } //...... }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
public class RemoteSenderService implements Service { private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class); private final ModuleManager moduleManager; private final HashCodeSelector hashCodeSelector; private final ForeverFirstSelector foreverFirstSelector; private final RollingSelector rollingSelector; public RemoteSenderService(ModuleManager moduleManager) { this.moduleManager = moduleManager; this.hashCodeSelector = new HashCodeSelector(); this.foreverFirstSelector = new ForeverFirstSelector(); this.rollingSelector = new RollingSelector(); } public void send(String nextWorkName, StreamData streamData, Selector selector) { RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class); RemoteClient remoteClient = null; List<RemoteClient> clientList = clientManager.getRemoteClient(); if (clientList.size() == 0) { logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized."); return; } switch (selector) { case HashCode: remoteClient = hashCodeSelector.select(clientList, streamData); break; case Rolling: remoteClient = rollingSelector.select(clientList, streamData); break; case ForeverFirst: remoteClient = foreverFirstSelector.select(clientList, streamData); break; } remoteClient.push(nextWorkName, streamData); } }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
public interface RemoteClientSelector { RemoteClient select(List<RemoteClient> clients, StreamData streamData); }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
public class HashCodeSelector implements RemoteClientSelector { @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) { int size = clients.size(); int selectIndex = Math.abs(streamData.remoteHashCode()) % size; return clients.get(selectIndex); } }
Math.abs(streamData.remoteHashCode()) % size
来选择selectIndex skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
public class RollingSelector implements RemoteClientSelector { private int index = 0; @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) { int size = clients.size(); index++; int selectIndex = Math.abs(index) % size; if (index == Integer.MAX_VALUE) { index = 0; } return clients.get(selectIndex); } }
Math.abs(index) % size
选择selectIndex skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
public class ForeverFirstSelector implements RemoteClientSelector { private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class); @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) { if (logger.isDebugEnabled()) { logger.debug("clients size: {}", clients.size()); } return clients.get(0); } }
RemoteClientManager提供了getRemoteClient方法获取usingClients,它还提供了start方法,该方法注册一个定时任务每隔5秒执行一次refresh;refresh方法通过clusterNodesQuery.queryRemoteNodes()获取instanceList列表,然后根据Address去重一下再排序,然后跟本地的RemoteClient列表进行对比,如果有发现变更则触发reBuildRemoteClients操作,最后在执行printRemoteClientList