本文主要研究一下skywalking的cluster-nacos-plugin
skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosConfig.java
public class ClusterModuleNacosConfig extends ModuleConfig { @Setter @Getter private String serviceName; @Setter @Getter private String hostPort; @Setter @Getter private String namespace = "public"; }
skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosProvider.java
public class ClusterModuleNacosProvider extends ModuleProvider { private final ClusterModuleNacosConfig config; private NamingService namingService; public ClusterModuleNacosProvider() { super(); this.config = new ClusterModuleNacosConfig(); } @Override public String name() { return "nacos"; } @Override public Class<? extends ModuleDefine> module() { return ClusterModule.class; } @Override public ModuleConfig createConfigBeanIfAbsent() { return config; } @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { try { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR, config.getHostPort()); properties.put(PropertyKeyConst.NAMESPACE, config.getNamespace()); namingService = NamingFactory.createNamingService(properties); } catch (Exception e) { throw new ModuleStartException(e.getMessage(), e); } NacosCoordinator coordinator = new NacosCoordinator(namingService, config); this.registerServiceImplementation(ClusterRegister.class, coordinator); this.registerServiceImplementation(ClusterNodesQuery.class, coordinator); } @Override public void start() throws ServiceNotProvidedException { } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { } @Override public String[] requiredModules() { return new String[]{CoreModule.NAME}; } }
skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/NacosCoordinator.java
public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery { private final NamingService namingService; private final ClusterModuleNacosConfig config; private volatile Address selfAddress; public NacosCoordinator(NamingService namingService, ClusterModuleNacosConfig config) { this.namingService = namingService; this.config = config; } @Override public List<RemoteInstance> queryRemoteNodes() { List<RemoteInstance> result = new ArrayList<>(); try { List<Instance> instances = namingService.selectInstances(config.getServiceName(), true); if (CollectionUtils.isNotEmpty(instances)) { instances.forEach(instance -> { Address address = new Address(instance.getIp(), instance.getPort(), false); if (address.equals(selfAddress)) { address.setSelf(true); } result.add(new RemoteInstance(address)); }); } } catch (NacosException e) { throw new ServiceQueryException(e.getErrMsg()); } return result; } @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException { String host = remoteInstance.getAddress().getHost(); int port = remoteInstance.getAddress().getPort(); try { namingService.registerInstance(config.getServiceName(), host, port); } catch (Exception e) { throw new ServiceRegisterException(e.getMessage()); } this.selfAddress = remoteInstance.getAddress(); TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString()); } }
ClusterModuleNacosProvider继承了ModuleProvider,其prepare方法创建NamingService及NacosCoordinator,然后将NacosCoordinator注册为ClusterRegister及ClusterNodesQuery的实现