本文主要研究一下skywalking的jvm-receiver-plugin
skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/JVMModuleProvider.java
public class JVMModuleProvider extends ModuleProvider { @Override public String name() { return "default"; } @Override public Class<? extends ModuleDefine> module() { return JVMModule.class; } @Override public ModuleConfig createConfigBeanIfAbsent() { return null; } @Override public void prepare() { } @Override public void start() { GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class); grpcHandlerRegister.addHandler(new JVMMetricsServiceHandler(getManager())); grpcHandlerRegister.addHandler(new JVMMetricReportServiceHandler(getManager())); } @Override public void notifyAfterCompleted() { } @Override public String[] requiredModules() { return new String[] {CoreModule.NAME, SharingServerModule.NAME}; } }
skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent/JVMMetricsService.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.apache.skywalking.apm.network.language.agent"; option csharp_namespace = "SkyWalking.NetworkProtocol"; import "language-agent/Downstream.proto"; import "common/JVM.proto"; service JVMMetricsService { rpc collect (JVMMetrics) returns (Downstream) { } } message JVMMetrics { repeated JVMMetric metrics = 1; int32 applicationInstanceId = 2; }
skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricsServiceHandler.java
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler { private static final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class); private final JVMSourceDispatcher jvmSourceDispatcher; public JVMMetricsServiceHandler(ModuleManager moduleManager) { this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager); } @Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) { int serviceInstanceId = request.getApplicationInstanceId(); if (logger.isDebugEnabled()) { logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId); } request.getMetricsList().forEach(metrics -> { long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime()); jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics); }); responseObserver.onNext(Downstream.newBuilder().build()); responseObserver.onCompleted(); } }
skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent-v2/JVMMetric.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.apache.skywalking.apm.network.language.agent.v2"; option csharp_namespace = "SkyWalking.NetworkProtocol"; import "common/common.proto"; import "common/JVM.proto"; service JVMMetricReportService { rpc collect (JVMMetricCollection) returns (Commands) { } } message JVMMetricCollection { repeated JVMMetric metrics = 1; int32 serviceInstanceId = 2; }
skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricReportServiceHandler.java
public class JVMMetricReportServiceHandler extends JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase implements GRPCHandler { private static final Logger logger = LoggerFactory.getLogger(JVMMetricReportServiceHandler.class); private final JVMSourceDispatcher jvmSourceDispatcher; public JVMMetricReportServiceHandler(ModuleManager moduleManager) { this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager); } @Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) { int serviceInstanceId = request.getServiceInstanceId(); if (logger.isDebugEnabled()) { logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId); } request.getMetricsList().forEach(metrics -> { long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime()); jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics); }); responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); } }
skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMSourceDispatcher.java
public class JVMSourceDispatcher { private static final Logger logger = LoggerFactory.getLogger(JVMSourceDispatcher.class); private final SourceReceiver sourceReceiver; private final ServiceInstanceInventoryCache instanceInventoryCache; public JVMSourceDispatcher(ModuleManager moduleManager) { this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class); } void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) { ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId); int serviceId; if (Objects.nonNull(serviceInstanceInventory)) { serviceId = serviceInstanceInventory.getServiceId(); } else { logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId); return; } this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu()); this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList()); this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList()); this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList()); } private void sendToCpuMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, CPU cpu) { ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU(); serviceInstanceJVMCPU.setId(serviceInstanceId); serviceInstanceJVMCPU.setName(Const.EMPTY_STRING); serviceInstanceJVMCPU.setServiceId(serviceId); serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING); serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent()); serviceInstanceJVMCPU.setTimeBucket(timeBucket); sourceReceiver.receive(serviceInstanceJVMCPU); } private void sendToGCMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, List<GC> gcs) { gcs.forEach(gc -> { ServiceInstanceJVMGC serviceInstanceJVMGC = new ServiceInstanceJVMGC(); serviceInstanceJVMGC.setId(serviceInstanceId); serviceInstanceJVMGC.setName(Const.EMPTY_STRING); serviceInstanceJVMGC.setServiceId(serviceId); serviceInstanceJVMGC.setServiceName(Const.EMPTY_STRING); switch (gc.getPhrase()) { case NEW: serviceInstanceJVMGC.setPhrase(GCPhrase.NEW); break; case OLD: serviceInstanceJVMGC.setPhrase(GCPhrase.OLD); break; } serviceInstanceJVMGC.setTime(gc.getTime()); serviceInstanceJVMGC.setCount(gc.getCount()); serviceInstanceJVMGC.setTimeBucket(timeBucket); sourceReceiver.receive(serviceInstanceJVMGC); }); } private void sendToMemoryMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, List<Memory> memories) { memories.forEach(memory -> { ServiceInstanceJVMMemory serviceInstanceJVMMemory = new ServiceInstanceJVMMemory(); serviceInstanceJVMMemory.setId(serviceInstanceId); serviceInstanceJVMMemory.setName(Const.EMPTY_STRING); serviceInstanceJVMMemory.setServiceId(serviceId); serviceInstanceJVMMemory.setServiceName(Const.EMPTY_STRING); serviceInstanceJVMMemory.setHeapStatus(memory.getIsHeap()); serviceInstanceJVMMemory.setInit(memory.getInit()); serviceInstanceJVMMemory.setMax(memory.getMax()); serviceInstanceJVMMemory.setUsed(memory.getUsed()); serviceInstanceJVMMemory.setCommitted(memory.getCommitted()); serviceInstanceJVMMemory.setTimeBucket(timeBucket); sourceReceiver.receive(serviceInstanceJVMMemory); }); } private void sendToMemoryPoolMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, List<MemoryPool> memoryPools) { memoryPools.forEach(memoryPool -> { ServiceInstanceJVMMemoryPool serviceInstanceJVMMemoryPool = new ServiceInstanceJVMMemoryPool(); serviceInstanceJVMMemoryPool.setId(serviceInstanceId); serviceInstanceJVMMemoryPool.setName(Const.EMPTY_STRING); serviceInstanceJVMMemoryPool.setServiceId(serviceId); serviceInstanceJVMMemoryPool.setServiceName(Const.EMPTY_STRING); switch (memoryPool.getType()) { case NEWGEN_USAGE: serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.NEWGEN_USAGE); break; case OLDGEN_USAGE: serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.OLDGEN_USAGE); break; case PERMGEN_USAGE: serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.PERMGEN_USAGE); break; case SURVIVOR_USAGE: serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.SURVIVOR_USAGE); break; case METASPACE_USAGE: serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.METASPACE_USAGE); break; case CODE_CACHE_USAGE: serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.CODE_CACHE_USAGE); break; } serviceInstanceJVMMemoryPool.setInit(memoryPool.getInit()); serviceInstanceJVMMemoryPool.setMax(memoryPool.getMax()); serviceInstanceJVMMemoryPool.setUsed(memoryPool.getUsed()); serviceInstanceJVMMemoryPool.setCommitted(memoryPool.getCommited()); serviceInstanceJVMMemoryPool.setTimeBucket(timeBucket); sourceReceiver.receive(serviceInstanceJVMMemoryPool); }); } }
JVMModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler;前者使用的是JVMMetricsService.proto,后者使用的是agent-v2的JVMMetric.proto