转载

Skywalking第八篇——Trace收集(二)

Skywalking第八篇——Trace收集(二)

前情提要

Skywalking第八篇——Trace收集(二)

在上一篇文章中,我们详细介绍了 Trace 的基本概念以及 Skywalking。从这一小节开始,我们将开始介绍 Trace Context 相关的组件。

Skywalking第八篇——Trace收集(二)

Context

Skywalking第八篇——Trace收集(二)

AbstractTracerContext 是 Skywalking 抽象链路上下文的顶层抽象类 ,在 一个线程中 Context 与 TraceSegment 一一对应, 其中定义了链路上下文的的基本行为:

public interface AbstractTracerContext {

// 在跨进程调用之前,当前进程会通过 inject()方法将当前 Context的全部内容注入到指定的 ContextCarrier,然后才能将当前 Context的信息发送出去

void inject(ContextCarrier carrier);

// 在跨进程调用的接收方,会通过 extract()方法将收到的 Context从 ContextCarrier中提取出来

void extract(ContextCarrier carrier);

// 在跨线程调用之前,会通过 capture()将当前线程的 Context进行快照,然后将快照传递给其他线程

ContextSnapshot capture();

// 在跨线程调用的接收方,会从收到的 ContextSnapshot中解析出 Context信息

void continued(ContextSnapshot snapshot);

String getReadableGlobalTraceId(); // 获取关联的TraceId

AbstractSpan createEntrySpan(String operationName); // 创建 EntrySpan

AbstractSpan createLocalSpan(String operationName); // 创建 LocalSpan

// 创建 ExitSpan

AbstractSpan createExitSpan(String operationName, String remotePeer);

AbstractSpan activeSpan(); // 获得当前活跃的 Span

boolean stopSpan(AbstractSpan span); // 停止指定 Span

// 当前 Span已经开启了异步状态,现在等待另一个线程关闭该 Span

AbstractTracerContext awaitFinishAsync();

void asyncStop(AsyncSpan span); // 关闭处于异步状态的 Span

}


TracingContext 是 AbstractTracerContext 实现类,其核心方法如下:

// 负责采样,后面会展开介绍采样逻辑

private SamplingService samplingService;

// 对应的 TraceSegment对象,在 TracingContext的构造方法中会创建该对象

private TraceSegment segment;

// 记录当前活跃的 Span

private LinkedList<AbstractSpan> activeSpanStack;

// Span编号自增序列。创建的 Span 的编号,通过该变量自增生成。

private int spanIdGenerator;

下面开始分析 TracingContext 中的核心方法,首先是 createEntrySpan() 方法,它负责创建 EntrySpan,如果已经存在父 Span,当然没法再创建 EntrySpan咯,重新调用一下 start()方法咯,╮(╯_╰)╭,大致实现如下( 其中省略了 DictionaryManager 的相关代码,后面再详细介绍这货 ):

public AbstractSpan createEntrySpan(final String operationName) {

if (isLimitMechanismWorking()) { // 默认,每个 TraceSegment只能放300个 Span,超过了就每30s打一条 WARN日志

NoopSpan span = new NoopSpan(); // 超过300就放 NoopSpan

return push(span);

}

AbstractSpan entrySpan;

final AbstractSpan parentSpan = peek(); // 获取 activeSpanStack集合中的最后一个 Span,就是父Span

final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();

if (parentSpan != null && parentSpan.isEntry()) {

// 更新 operationId(或operationName)

entrySpan = parentSpan.setOperationId(operationId);

// 重新调用 start()方法,前面提到过,start()方法会重置operationId(以及或operationName)之外的其他字段

return entrySpan.start();

} else {

// 新建 EntrySpan对象

entrySpan = new EntrySpan(spanIdGenerator++, parentSpanId, operationId);

// 调用 start()方法,第一次调用时会设置startTime

entrySpan.start();

// 将新建的 Span添加到 activeSpanStack集合尾部

return push(entrySpan);

}

}

接下来看 createLocalSpan()方法,就是创建个 LocalSpan对象,然后加到  activeSpanStack 集合中,大致实现如下:

public AbstractSpan createLocalSpan(final String operationName) {

// 检测当前 TraceSegment中的 Span个数(和上面的逻辑一样,略)

// 获取父 Span以及父 Span的Id

AbstractSpan parentSpan = peek();

final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();

// 直接创建 LocalSpan对象并调用其 start()方法

AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName);

span.start();

return push(span); // 记入 activeSpanStack集合

}

再往下自然就到了 createExitSpan() 方法,创建或是重用 ExitSpan,实现如下:

public AbstractSpan createExitSpan(final String operationName, final String remotePeer) {

AbstractSpan exitSpan;

AbstractSpan parentSpan = peek(); // 获取父 Span

if (parentSpan != null && parentSpan.isExit()) {

// 父 Span已经是 ExitSpan,就不再新建了

exitSpan = parentSpan;

} else {

// 父 Span不是 ExitSpan,就新建一个 ExitSpan

final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();

exitSpan = new ExitSpan(spanIdGenerator++, parentSpanId, operationId, peerId);

push(exitSpan); // 记入 activeSpanStack集合

}

exitSpan.start();// 调用 start()方法,第一次调用的时候,会设置 startTime

return exitSpan;

}

最后来看 stopSpan() 方法,它负责关闭指定的 Span对象:

public boolean stopSpan(AbstractSpan span) {

AbstractSpan lastSpan = peek(); // 获取当前活跃的 Span对象

if (lastSpan == span) { // 只能关闭当前活跃 Span对象,否则抛异常

if (lastSpan instanceof AbstractTracingSpan) {

AbstractTracingSpan toFinishSpan = (AbstractTracingSpan)lastSpan;

if (toFinishSpan.finish(segment)) { // 尝试关闭 Span

pop(); // 当 Span完全关闭之后,会将其从 activeSpanStack集合中删除

}

} else {

pop();

}

} else {

throw new IllegalStateException("Stopping the unexpected span = " + span);

}

// TraceSegment中全部 Span都关闭(且异步状态的 Span也关闭了),则当前 TraceSegment也会关闭

if (checkFinishConditions()) {

finish();

}

return activeSpanStack.isEmpty();

}

在 TracingContext.finish() 方法中会关闭关联的 TraceSegment 对象,完成采样操作,还会通知相关的 Listener:

private void finish() {

// 关闭当前 TraceSegment对象

TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());

// 当没有父 TraceSegment的时候(表示当前链路不完整),而且该 TraceSegment只有一个 Span的时候

if (!segment.hasRef() && segment.isSingleSpanSegment()) {

if (!samplingService.trySampling()) { // 未开启采样

finishedSegment.setIgnore(true);

}

}

// 通知监听器,目前 Listener就是 TraceSegmentServiceClient

TracingContext.ListenerManager.notifyFinish(finishedSegment);

}

在开始介绍 inject() 方法和 extract() 方法之前,需要先介绍一下他们的参数——  ContextCarrier,它记录了当前 TracingContext的一些基本信息,并实现了  Serializable 接口哦,ContextCarrier 中的字段不再详细介绍,望文生义即可,嘎嘎嘎。

TracingContext.inject() 方法的功能就是将 TracingContext 对象填充到 ContextCarrier 中,看着很长,其实就是一些简单的字段,实现如下:

public void inject(ContextCarrier carrier) {

// 检测当前活跃的 Span是否为 ExitSpan,否则会抛出异常(略)

WithPeerInfo spanWithPeer = (WithPeerInfo)span;

String peer = spanWithPeer.getPeer();

int peerId = spanWithPeer.getPeerId();

// 设置 TraceSegmentId

carrier.setTraceSegmentId(this.segment.getTraceSegmentId());

carrier.setSpanId(span.getSpanId()); // 设置 SpanId

// 设置 service_instance_id

carrier.setParentServiceInstanceId(segment.getApplicationInstanceId());

// 设置 peerHost或是 peerId

if (DictionaryUtil.isNull(peerId)) {

carrier.setPeerHost(peer);

} else {

carrier.setPeerId(peerId);

}

List<TraceSegmentRef> refs = this.segment.getRefs();

int operationId;

String operationName;

int entryApplicationInstanceId;

if (refs != null && refs.size() > 0) {

TraceSegmentRef ref = refs.get(0);

operationId = ref.getEntryEndpointId();

operationName = ref.getEntryEndpointName();

entryApplicationInstanceId = ref.getEntryServiceInstanceId();

} else {

AbstractSpan firstSpan = first();

operationId = firstSpan.getOperationId();

operationName = firstSpan.getOperationName();

entryApplicationInstanceId = this.segment.getApplicationInstanceId();

}

// 设置 entryApplicationInstanceId

carrier.setEntryServiceInstanceId(entryApplicationInstanceId);

// 设置 operationName或 operationId

if (operationId == DictionaryUtil.nullValue()) {

if (!StringUtil.isEmpty(operationName)) {

carrier.setEntryEndpointName(operationName);

}

} else {

carrier.setEntryEndpointId(operationId);

}

// 设置 parentEndpointId或是 parentEndpointName

int parentOperationId = first().getOperationId();

if (parentOperationId == DictionaryUtil.nullValue()) {

carrier.setParentEndpointName(first().getOperationName());

} else {

carrier.setParentEndpointId(parentOperationId);

}

// 设置 primaryDistributedTraceId,只会从 relatedGlobalTraces集合中取第一个

carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());

}

在 TracingContext 的 extract() 方法就是将  ContextCarrier 中各个字段,解出来放到 TraceSegmentRef 的相应字段中:

public void extract(ContextCarrier carrier) {

// 将 ContextCarrier参数中相应字段填充到 TraceSegmentRef中的相应字段

TraceSegmentRef ref = new TraceSegmentRef(carrier);

this.segment.ref(ref); // 将 TraceSegmentRef记录到当前 TraceSegment中

this.segment.relatedGlobalTraces(carrier.getDistributedTraceId());

AbstractSpan span = this.activeSpan();

if (span instanceof EntrySpan) {

span.ref(ref); // 将上面的 TraceSegmentRef对象记录到 Span中

}

}

下面看一下 capture() 方法和 continued() 方法的参数—— ContextSnapshot,它与前面的 ContextCarrier 类似。因为只是跨线程传递,所以像 service_intance_id 这种字段就没必要传递了。 ca pture () 方法和 continued() 方法 的行为与 inject() 方法和 extract() 方法类似,这里不再展开分析了。

Skywalking第八篇——Trace收集(二)

ContextManager

Skywalking第八篇——Trace收集(二)

在前面介绍 ServiceManager SPI 加载 BootService 接口实现类的时候,我们看到有一个叫 ContextManager 的实现类,就是它来控制 Context 的。 ContextManager 中的 prepare()、boot()、onComplete() 方法都是空实现

ContextManager 的核心字段如下:

// TracingContext与一个 TraceSegment绑定,一个 TraceSegment与

// 一个线程关联,所有这里使用ThreadLocal

private static ThreadLocal<AbstractTracerContext> CONTEXT

= new ThreadLocal<AbstractTracerContext>();

// RuntimeContext 其实就是对 CurrentHashMap的封装

private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT

= new ThreadLocal<RuntimeContext>();

// ContextManagerExtendService也实现了BootService,

// 也是通过SPI方式加载的,它就一个功能,就是创建 Context

private static ContextManagerExtendService EXTEND_SERVICE;

在 ContextManager 提供的 createEntrySpan()、createExitSpan() 以及 createLocalSpan()等方法中,都会调用其 getOrCreate() 这个 static 方法,在该方法中负责创建 TracingContext,如下所示:

private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {

AbstractTracerContext context = CONTEXT.get(); // 从 ThreadLocal里面拿 TracingContext

if (context == null) {

if (EXTEND_SERVICE == null) {

EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);

}

// ContextManagerExtendService只用于创建Context

context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);

CONTEXT.set(context); // 保存到 ThreadLocal中

}

return context;

}

后会 再调用  TracingContext  的相应方法,创建指定类型 的 Span。

ContextManager 中的其他方法都是先调用 get() 这个静态方法拿到当前线程的 TracingContext,然后调用 TracingContext 的相应方法实现的,不展开了╮(╯_╰)╭。

Skywalking第八篇——Trace收集(二)

SamplingService

Skywalking第八篇——Trace收集(二)

前面的分析中多次看到 SamplingService,它负责进行采样,其核心字段如下:

// 是否开启了采样

private volatile boolean on = false;

// 当前已经采样了多少 Trace

private volatile AtomicInteger samplingFactorHolder;

// 定时任务,该定时任务每三秒重置 samplingFactorHolder

private volatile ScheduledFuture<?> scheduledFuture;

在 SamplingService.boot()方法中会根据配置初始化上述字段:

public void boot() throws Throwable {

if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) { // 开启采样

on = true;

this.resetSamplingFactor(); // 重置 samplingFactorHolder

ScheduledExecutorService service = Executors

.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));

scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {

@Override

public void run() {

resetSamplingFactor(); // 每3秒重置 samplingFactorHolder

}

}, 0, 3, TimeUnit.SECONDS);

}

}

在 SamplingService.trySampling()方法中会尝试增加 samplingFactorHolder 的值,当其值超过配置指定的值时,会返回false,表示该 Trace 未被采样到,具体实现如下:

public boolean trySampling() {

if (on) { // 开启了 Trace采样功能

int factor = samplingFactorHolder.get();

if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {

boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);

return success; // 该 Trace被采样

} else {

return false; // 该 Trace未被采样

}

}

return true;

}

Skywalking第八篇——Trace收集(二)
总结
Skywalking第八篇——Trace收集(二)

Skywalking第八篇——Trace收集(二)

好了,Context 相关的组件基本就介绍完了,其中包括 AbstractTracerContext 及其实现类、ContextManager ,主要提供了如下方法:

  • 创建 EntrySpan、LocalSpan、ExitSpan 三类Span 的方法。

  • 关闭 Span 的 stopSpan() 方法。

  • 用于跨进程传播的 inject()、extract() 方法,以及涉及到的 ContextCarrier 组件

  • 用于跨线程传播的 capture()、continue()方法,以及涉及到的 ContextSnapshot组件。

行吧,下一节看几个插件的实现,你说行不?

扫描下图二维码,关注【程序员吴小胖】

从底部 ”源码分析“菜单 即可获取

《Skywalking源码分析指北》全部文章哟~

Skywalking第八篇——Trace收集(二)

看懂看不懂,都点个赞吧:+1:

原文  http://mp.weixin.qq.com/s?__biz=MzU5Mjc5OTY5Ng==&mid=2247484599&idx=2&sn=36e77e481da765c187557b503a814a5e
正文到此结束
Loading...