本文主要研究一下skywalking的GRPCStreamServiceStatus
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.java
public class GRPCStreamServiceStatus { private static final ILog logger = LogManager.getLogger(GRPCStreamServiceStatus.class); private volatile boolean status; public GRPCStreamServiceStatus(boolean status) { this.status = status; } public boolean isStatus() { return status; } public void finished() { this.status = true; } /** * @param maxTimeout max wait time, milliseconds. */ public boolean wait4Finish(long maxTimeout) { long time = 0; while (!status) { if (time > maxTimeout) { break; } try2Sleep(5); time += 5; } return status; } /** * Wait until success status reported. */ public void wait4Finish() { long recheckCycle = 5; long hasWaited = 0L; long maxCycle = 30 * 1000L;// 30 seconds max. while (!status) { try2Sleep(recheckCycle); hasWaited += recheckCycle; if (recheckCycle >= maxCycle) { logger.warn("Collector traceSegment service doesn't response in {} seconds.", hasWaited / 1000); } else { recheckCycle = recheckCycle * 2 > maxCycle ? maxCycle : recheckCycle * 2; } } } /** * Try to sleep, and ignore the {@link InterruptedException} * * @param millis the length of time to sleep in milliseconds */ private void try2Sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { } } }
Collector traceSegment service doesn't response in {} seconds.
,参数值为 hasWaited / 1000
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@DefaultImplementor public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener { //...... @Override public void consume(List<TraceSegment> data) { if (CONNECTED.equals(status)) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { @Override public void onNext(Commands commands) { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } @Override public void onError(Throwable throwable) { status.finished(); if (logger.isErrorEnable()) { logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception."); } ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable); } @Override public void onCompleted() { status.finished(); } }); try { for (TraceSegment segment : data) { UpstreamSegment upstreamSegment = segment.transform(); upstreamSegmentStreamObserver.onNext(upstreamSegment); } } catch (Throwable t) { logger.error(t, "Transform and send UpstreamSegment to collector fail."); } upstreamSegmentStreamObserver.onCompleted(); status.wait4Finish(); segmentUplinkedCounter += data.size(); } else { segmentAbandonedCounter += data.size(); } printUplinkStatus(); } private void printUplinkStatus() { long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis - lastLogTime > 30 * 1000) { lastLogTime = currentTimeMillis; if (segmentUplinkedCounter > 0) { logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter); segmentUplinkedCounter = 0; } if (segmentAbandonedCounter > 0) { logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter); segmentAbandonedCounter = 0; } } } //...... }
GRPCStreamServiceStatus提供了finished方法用于将status设置为true;它还提供了wait4Finish方法,该方法会一直等待status变为true,同时在recheckCycle大于等于maxCycle时打印warn日志