转载

聊聊skywalking的configuration-nacos

本文主要研究一下skywalking的configuration-nacos

NacosConfigurationProvider

skywalking-6.6.0/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigurationProvider.java

public class NacosConfigurationProvider extends AbstractConfigurationProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(NacosConfigurationProvider.class);

    private NacosServerSettings settings;

    public NacosConfigurationProvider() {
        settings = new NacosServerSettings();
    }

    @Override
    public String name() {
        return "nacos";
    }

    @Override
    public ModuleConfig createConfigBeanIfAbsent() {
        return settings;
    }

    @Override
    protected ConfigWatcherRegister initConfigReader() throws ModuleStartException {
        LOGGER.info("settings: {}", settings);
        if (Strings.isNullOrEmpty(settings.getServerAddr())) {
            throw new ModuleStartException("Nacos serverAddr cannot be null or empty.");
        }
        if (settings.getPort() <= 0) {
            throw new ModuleStartException("Nacos port must be positive integer.");
        }
        if (Strings.isNullOrEmpty(settings.getGroup())) {
            throw new ModuleStartException("Nacos group cannot be null or empty.");
        }

        try {
            return new NacosConfigWatcherRegister(settings);
        } catch (NacosException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
    }
}
  • NacosConfigurationProvider继承了AbstractConfigurationProvider,其name方法返回nacos,其initConfigReader方法返回NacosConfigWatcherRegister

NacosServerSettings

skywalking-6.6.0/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosServerSettings.java

@Getter
@Setter
@ToString
public class NacosServerSettings extends ModuleConfig {
    private String clusterName = "default";
    private String namespace = "";
    private String serverAddr;
    private int port = 8848;
    private String group;
    private int period = 60;
}
  • NacosServerSettings定义了clusterName、namespace、serverAddr、port、group、period属性

NacosConfigWatcherRegister

skywalking-6.6.0/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java

public class NacosConfigWatcherRegister extends ConfigWatcherRegister {
    private static final Logger LOGGER = LoggerFactory.getLogger(NacosConfigWatcherRegister.class);

    private final NacosServerSettings settings;
    private final ConfigService configService;
    private final Map<String, Optional<String>> configItemKeyedByName;
    private final Map<String, Listener> listenersByKey;

    public NacosConfigWatcherRegister(NacosServerSettings settings) throws NacosException {
        super(settings.getPeriod());

        this.settings = settings;
        this.configItemKeyedByName = new ConcurrentHashMap<>();
        this.listenersByKey = new ConcurrentHashMap<>();

        final int port = this.settings.getPort();
        final String serverAddr = this.settings.getServerAddr();

        final Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr + ":" + port);
        properties.put(PropertyKeyConst.NAMESPACE, settings.getNamespace());
        this.configService = NacosFactory.createConfigService(properties);
    }

    @Override
    public ConfigTable readConfig(Set<String> keys) {
        removeUninterestedKeys(keys);
        registerKeyListeners(keys);

        final ConfigTable table = new ConfigTable();

        for (Map.Entry<String, Optional<String>> entry : configItemKeyedByName.entrySet()) {
            final String key = entry.getKey();
            final Optional<String> value = entry.getValue();

            if (value.isPresent()) {
                table.add(new ConfigTable.ConfigItem(key, value.get()));
            } else {
                table.add(new ConfigTable.ConfigItem(key, null));
            }
        }

        return table;
    }

    private void registerKeyListeners(final Set<String> keys) {
        final String group = settings.getGroup();

        for (final String dataId : keys) {
            if (listenersByKey.containsKey(dataId)) {
                continue;
            }
            try {
                listenersByKey.putIfAbsent(dataId, new Listener() {
                    @Override
                    public Executor getExecutor() {
                        return null;
                    }

                    @Override
                    public void receiveConfigInfo(String configInfo) {
                        onDataIdValueChanged(dataId, configInfo);
                    }
                });
                configService.addListener(dataId, group, listenersByKey.get(dataId));

                // the key is newly added, read the config for the first time
                final String config = configService.getConfig(dataId, group, 1000);
                onDataIdValueChanged(dataId, config);
            } catch (NacosException e) {
                LOGGER.warn("Failed to register Nacos listener for dataId: {}", dataId);
            }
        }
    }

    private void removeUninterestedKeys(final Set<String> interestedKeys) {
        final String group = settings.getGroup();

        final Set<String> uninterestedKeys = new HashSet<>(listenersByKey.keySet());
        uninterestedKeys.removeAll(interestedKeys);

        uninterestedKeys.forEach(k -> {
            final Listener listener = listenersByKey.remove(k);
            if (listener != null) {
                configService.removeListener(k, group, listener);
            }
        });
    }

    void onDataIdValueChanged(String dataId, String configInfo) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Nacos config changed: {}: {}", dataId, configInfo);
        }

        configItemKeyedByName.put(dataId, Optional.ofNullable(configInfo));
    }
}
  • NacosConfigWatcherRegister继承了ConfigWatcherRegister,其构造器通过NacosFactory.createConfigService(properties)创建ConfigService;其readConfig方法先执行removeUninterestedKeys移除uninterestedKeys,后执行registerKeyListeners,在onDataIdValueChanged的时候更新configItemKeyedByName,然后遍历configItemKeyedByName.entrySet(),将配置加载到ConfigTable

小结

NacosConfigurationProvider继承了AbstractConfigurationProvider,其name方法返回nacos,其initConfigReader方法返回NacosConfigWatcherRegister

doc

  • NacosConfigurationProvider
原文  https://segmentfault.com/a/1190000022120545
正文到此结束
Loading...