本文主要研究一下skywalking的DatabaseSlowStatement
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@ScopeDeclaration(id = DATABASE_SLOW_STATEMENT, name = "DatabaseSlowStatement") public class DatabaseSlowStatement extends Source { @Getter @Setter private String id; @Getter @Setter private int databaseServiceId; @Getter @Setter private String statement; @Getter @Setter private long latency; @Getter @Setter private String traceId; @Override public int scope() { return DefaultScopeDefine.DATABASE_SLOW_STATEMENT; } @Override public String getEntityId() { return Const.EMPTY_STRING; } }
skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListener, GlobalTraceIdsListener { private static final Logger logger = LoggerFactory.getLogger(MultiScopesSpanListener.class); private final SourceReceiver sourceReceiver; private final ServiceInstanceInventoryCache instanceInventoryCache; private final ServiceInventoryCache serviceInventoryCache; private final EndpointInventoryCache endpointInventoryCache; private final List<SourceBuilder> entrySourceBuilders; private final List<SourceBuilder> exitSourceBuilders; private final List<DatabaseSlowStatement> slowDatabaseAccesses; private final TraceServiceModuleConfig config; private final NetworkAddressInventoryCache networkAddressInventoryCache; private SpanDecorator entrySpanDecorator; private long minuteTimeBucket; private String traceId; private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) { this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); this.entrySourceBuilders = new LinkedList<>(); this.exitSourceBuilders = new LinkedList<>(); this.slowDatabaseAccesses = new ArrayList<>(10); this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class); this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class); this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class); this.config = config; this.traceId = null; } @Override public boolean containsPoint(Point point) { return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.TraceIds.equals(point); } @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket(); if (spanDecorator.getRefsCount() > 0) { for (int i = 0; i < spanDecorator.getRefsCount(); i++) { ReferenceDecorator reference = spanDecorator.getRefs(i); SourceBuilder sourceBuilder = new SourceBuilder(); if (reference.getParentEndpointId() == Const.INEXISTENCE_ENDPOINT_ID) { sourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID); } else { sourceBuilder.setSourceEndpointId(reference.getParentEndpointId()); } final int networkAddressId = reference.getNetworkAddressId(); final int serviceIdByPeerId = serviceInventoryCache.getServiceId(networkAddressId); final String address = networkAddressInventoryCache.get(networkAddressId).getName(); if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ) || config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) { int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(serviceIdByPeerId, networkAddressId); sourceBuilder.setSourceServiceInstanceId(instanceIdByPeerId); sourceBuilder.setSourceServiceId(serviceIdByPeerId); } else { sourceBuilder.setSourceServiceInstanceId(reference.getParentServiceInstanceId()); sourceBuilder.setSourceServiceId(instanceInventoryCache.get(reference.getParentServiceInstanceId()).getServiceId()); } sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId()); sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId()); sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId()); sourceBuilder.setDetectPoint(DetectPoint.SERVER); sourceBuilder.setComponentId(spanDecorator.getComponentId()); setPublicAttrs(sourceBuilder, spanDecorator); entrySourceBuilders.add(sourceBuilder); } } else { SourceBuilder sourceBuilder = new SourceBuilder(); sourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID); sourceBuilder.setSourceServiceInstanceId(Const.USER_INSTANCE_ID); sourceBuilder.setSourceServiceId(Const.USER_SERVICE_ID); sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId()); sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId()); sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId()); sourceBuilder.setDetectPoint(DetectPoint.SERVER); sourceBuilder.setComponentId(spanDecorator.getComponentId()); setPublicAttrs(sourceBuilder, spanDecorator); entrySourceBuilders.add(sourceBuilder); } this.entrySpanDecorator = spanDecorator; } @Override public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { if (this.minuteTimeBucket == 0) { this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket(); } SourceBuilder sourceBuilder = new SourceBuilder(); int peerId = spanDecorator.getPeerId(); if (peerId == 0) { return; } int destServiceId = serviceInventoryCache.getServiceId(peerId); int mappingServiceId = serviceInventoryCache.get(destServiceId).getMappingServiceId(); int destInstanceId = instanceInventoryCache.getServiceInstanceId(destServiceId, peerId); int mappingServiceInstanceId = instanceInventoryCache.get(destInstanceId).getMappingServiceInstanceId(); sourceBuilder.setSourceServiceInstanceId(segmentCoreInfo.getServiceInstanceId()); sourceBuilder.setSourceServiceId(segmentCoreInfo.getServiceId()); if (Const.NONE == mappingServiceId) { sourceBuilder.setDestServiceId(destServiceId); } else { sourceBuilder.setDestServiceId(mappingServiceId); } if (Const.NONE == mappingServiceInstanceId) { sourceBuilder.setDestServiceInstanceId(destInstanceId); } else { sourceBuilder.setDestServiceInstanceId(mappingServiceInstanceId); } sourceBuilder.setDetectPoint(DetectPoint.CLIENT); sourceBuilder.setComponentId(spanDecorator.getComponentId()); setPublicAttrs(sourceBuilder, spanDecorator); exitSourceBuilders.add(sourceBuilder); if (sourceBuilder.getType().equals(RequestType.DATABASE)) { boolean isSlowDBAccess = false; DatabaseSlowStatement statement = new DatabaseSlowStatement(); statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId()); statement.setDatabaseServiceId(sourceBuilder.getDestServiceId()); statement.setLatency(sourceBuilder.getLatency()); statement.setTimeBucket(TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime())); statement.setTraceId(traceId); for (KeyStringValuePair tag : spanDecorator.getAllTags()) { if (SpanTags.DB_STATEMENT.equals(tag.getKey())) { String sqlStatement = tag.getValue(); if (StringUtil.isEmpty(sqlStatement)) { statement.setStatement("[No statement]/" + sourceBuilder.getDestEndpointName()); } else if (sqlStatement.length() > config.getMaxSlowSQLLength()) { statement.setStatement(sqlStatement.substring(0, config.getMaxSlowSQLLength())); } else { statement.setStatement(sqlStatement); } } else if (SpanTags.DB_TYPE.equals(tag.getKey())) { String dbType = tag.getValue(); DBLatencyThresholdsAndWatcher thresholds = config.getDbLatencyThresholdsAndWatcher(); int threshold = thresholds.getThreshold(dbType); if (sourceBuilder.getLatency() > threshold) { isSlowDBAccess = true; } } } if (isSlowDBAccess) { slowDatabaseAccesses.add(statement); } } } //...... @Override public void build() { entrySourceBuilders.forEach(entrySourceBuilder -> { entrySourceBuilder.setTimeBucket(minuteTimeBucket); sourceReceiver.receive(entrySourceBuilder.toAll()); sourceReceiver.receive(entrySourceBuilder.toService()); sourceReceiver.receive(entrySourceBuilder.toServiceInstance()); sourceReceiver.receive(entrySourceBuilder.toEndpoint()); sourceReceiver.receive(entrySourceBuilder.toServiceRelation()); sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation()); EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation(); /** * Parent endpoint could be none, because in SkyWalking Cross Process Propagation Headers Protocol v2, * endpoint in ref could be empty, based on that, endpoint relation maybe can't be established. * So, I am making this source as optional. * * Also, since 6.6.0, source endpoint could be none, if this trace begins by an internal task(local span or exit span), such as Timer, * rather than, normally begin as an entry span, like a RPC server side. */ if (endpointRelation != null) { sourceReceiver.receive(endpointRelation); } }); exitSourceBuilders.forEach(exitSourceBuilder -> { if (nonNull(entrySpanDecorator)) { exitSourceBuilder.setSourceEndpointId(entrySpanDecorator.getOperationNameId()); } else { exitSourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID); } exitSourceBuilder.setSourceEndpointName(endpointInventoryCache.get(exitSourceBuilder.getSourceEndpointId()).getName()); exitSourceBuilder.setTimeBucket(minuteTimeBucket); sourceReceiver.receive(exitSourceBuilder.toServiceRelation()); sourceReceiver.receive(exitSourceBuilder.toServiceInstanceRelation()); if (RequestType.DATABASE.equals(exitSourceBuilder.getType())) { sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess()); } }); slowDatabaseAccesses.forEach(sourceReceiver::receive); } //...... }
slowDatabaseAccesses.forEach(sourceReceiver::receive)
接收 skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
public class SourceReceiverImpl implements SourceReceiver { @Getter private final DispatcherManager dispatcherManager; public SourceReceiverImpl() { this.dispatcherManager = new DispatcherManager(); } @Override public void receive(Source source) { dispatcherManager.forward(source); } public void scan() throws IOException, InstantiationException, IllegalAccessException { dispatcherManager.scan(); } }
DatabaseSlowStatement继承了Source,它定义了id、databaseServiceId、statement、latency、traceId属性,其scope方法返回DefaultScopeDefine.DATABASE_SLOW_STATEMENT;MultiScopesSpanListener实现了EntrySpanListener、ExitSpanListener、GlobalTraceIdsListener接口,其parseExit方法在sourceBuilder.getType()为RequestType.DATABASE的时候会创建DatabaseSlowStatement,在tag.getKey()为SpanTags.DB_TYPE时,通过config.getDbLatencyThresholdsAndWatcher()获取DBLatencyThresholdsAndWatcher,然后在latency大于threshold时更新isSlowDBAccess为true,最后将DatabaseSlowStatement添加到slowDatabaseAccesses中;其build方法通过 slowDatabaseAccesses.forEach(sourceReceiver::receive)
接收