本文主要研究一下skywalking的metric-exporter
skywalking-6.6.0/oap-server/exporter/src/main/proto/metric-exporter.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.apache.skywalking.oap.server.exporter.grpc"; service MetricExportService { rpc export (stream ExportMetricValue) returns (ExportResponse) { } rpc subscription (SubscriptionReq) returns (SubscriptionsResp) { } } message ExportMetricValue { string metricName = 1; string entityName = 2; string entityId = 3; ValueType type = 4; int64 timeBucket = 5; int64 longValue = 6; double doubleValue = 7; } message SubscriptionsResp { repeated string metricNames = 1; } enum ValueType { LONG = 0; DOUBLE = 1; } message SubscriptionReq { } message ExportResponse { }
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
@Setter @Getter public class GRPCExporterSetting extends ModuleConfig { private String targetHost; private int targetPort; private int bufferChannelSize = 20000; private int bufferChannelNum = 2; }
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
public class GRPCExporterProvider extends ModuleProvider { private GRPCExporterSetting setting; private GRPCExporter exporter; @Override public String name() { return "grpc"; } @Override public Class<? extends ModuleDefine> module() { return ExporterModule.class; } @Override public ModuleConfig createConfigBeanIfAbsent() { setting = new GRPCExporterSetting(); return setting; } @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { exporter = new GRPCExporter(setting); this.registerServiceImplementation(MetricValuesExportService.class, exporter); } @Override public void start() throws ServiceNotProvidedException, ModuleStartException { } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { ModuleServiceHolder serviceHolder = getManager().find(CoreModule.NAME).provider(); exporter.setServiceInventoryCache(serviceHolder.getService(ServiceInventoryCache.class)); exporter.setServiceInstanceInventoryCache(serviceHolder.getService(ServiceInstanceInventoryCache.class)); exporter.setEndpointInventoryCache(serviceHolder.getService(EndpointInventoryCache.class)); exporter.initSubscriptionList(); } @Override public String[] requiredModules() { return new String[] {CoreModule.NAME}; } }
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java
@Setter public class MetricFormatter { private ServiceInventoryCache serviceInventoryCache; private ServiceInstanceInventoryCache serviceInstanceInventoryCache; private EndpointInventoryCache endpointInventoryCache; protected String getEntityName(MetricsMetaInfo meta) { int scope = meta.getScope(); if (DefaultScopeDefine.inServiceCatalog(scope)) { int entityId = Integer.valueOf(meta.getId()); return serviceInventoryCache.get(entityId).getName(); } else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) { int entityId = Integer.valueOf(meta.getId()); return serviceInstanceInventoryCache.get(entityId).getName(); } else if (DefaultScopeDefine.inEndpointCatalog(scope)) { int entityId = Integer.valueOf(meta.getId()); return endpointInventoryCache.get(entityId).getName(); } else if (scope == DefaultScopeDefine.ALL) { return ""; } else { return null; } } }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
public interface MetricValuesExportService extends Service { /** * This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended. * * @param event value is only accurate when the method invokes. Don't cache it. */ void export(ExportEvent event); }
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<GRPCExporter.ExportData> { private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class); private GRPCExporterSetting setting; private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub; private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub; private final DataCarrier exportBuffer; private final Set<String> subscriptionSet; public GRPCExporter(GRPCExporterSetting setting) { this.setting = setting; GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort()); client.connect(); ManagedChannel channel = client.getChannel(); exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel); blockingStub = MetricExportServiceGrpc.newBlockingStub(channel); exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize()); exportBuffer.consume(this, 1, 200); subscriptionSet = new HashSet<>(); } @Override public void export(ExportEvent event) { if (ExportEvent.EventType.TOTAL == event.getType()) { Metrics metrics = event.getMetrics(); if (metrics instanceof WithMetadata) { MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta(); if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { exportBuffer.produce(new ExportData(meta, metrics)); } } } } public void initSubscriptionList() { SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build()); subscription.getMetricNamesList().forEach(subscriptionSet::add); logger.debug("Get exporter subscription list, {}", subscriptionSet); } @Override public void init() { } @Override public void consume(List<ExportData> data) { if (data.size() == 0) { return; } ExportStatus status = new ExportStatus(); StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export( new StreamObserver<ExportResponse>() { @Override public void onNext(ExportResponse response) { } @Override public void onError(Throwable throwable) { status.done(); } @Override public void onCompleted() { status.done(); } } ); AtomicInteger exportNum = new AtomicInteger(); data.forEach(row -> { ExportMetricValue.Builder builder = ExportMetricValue.newBuilder(); Metrics metrics = row.getMetrics(); if (metrics instanceof LongValueHolder) { long value = ((LongValueHolder)metrics).getValue(); builder.setLongValue(value); builder.setType(ValueType.LONG); } else if (metrics instanceof IntValueHolder) { long value = ((IntValueHolder)metrics).getValue(); builder.setLongValue(value); builder.setType(ValueType.LONG); } else if (metrics instanceof DoubleValueHolder) { double value = ((DoubleValueHolder)metrics).getValue(); builder.setDoubleValue(value); builder.setType(ValueType.DOUBLE); } else { return; } MetricsMetaInfo meta = row.getMeta(); builder.setMetricName(meta.getMetricsName()); String entityName = getEntityName(meta); if (entityName == null) { return; } builder.setEntityName(entityName); builder.setEntityId(meta.getId()); builder.setTimeBucket(metrics.getTimeBucket()); streamObserver.onNext(builder.build()); exportNum.getAndIncrement(); }); streamObserver.onCompleted(); long sleepTime = 0; long cycle = 100L; /** * For memory safe of oap, we must wait for the peer confirmation. */ while (!status.isDone()) { try { sleepTime += cycle; Thread.sleep(cycle); } catch (InterruptedException e) { } if (sleepTime > 2000L) { logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime); cycle = 2000L; } } logger.debug("Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime); } @Override public void onError(List<ExportData> data, Throwable t) { logger.error(t.getMessage(), t); } @Override public void onExit() { } @Getter(AccessLevel.PRIVATE) public class ExportData { private MetricsMetaInfo meta; private Metrics metrics; public ExportData(MetricsMetaInfo meta, Metrics metrics) { this.meta = meta; this.metrics = metrics; } } private class ExportStatus { private boolean done = false; private void done() { done = true; } public boolean isDone() { return done; } } }
metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法;GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()