前情提要
在上一篇文章中,我们详细介绍了 Trace 的基本概念以及 Skywalking。从这一小节开始,我们将开始介绍 Trace Context 相关的组件。
Context
AbstractTracerContext 是 Skywalking 抽象链路上下文的顶层抽象类 ,在 一个线程中 Context 与 TraceSegment 一一对应, 其中定义了链路上下文的的基本行为:
public interface AbstractTracerContext {
// 在跨进程调用之前,当前进程会通过 inject()方法将当前 Context的全部内容注入到指定的 ContextCarrier,然后才能将当前 Context的信息发送出去
void inject(ContextCarrier carrier);
// 在跨进程调用的接收方,会通过 extract()方法将收到的 Context从 ContextCarrier中提取出来
void extract(ContextCarrier carrier);
// 在跨线程调用之前,会通过 capture()将当前线程的 Context进行快照,然后将快照传递给其他线程
ContextSnapshot capture();
// 在跨线程调用的接收方,会从收到的 ContextSnapshot中解析出 Context信息
void continued(ContextSnapshot snapshot);
String getReadableGlobalTraceId(); // 获取关联的TraceId
AbstractSpan createEntrySpan(String operationName); // 创建 EntrySpan
AbstractSpan createLocalSpan(String operationName); // 创建 LocalSpan
// 创建 ExitSpan
AbstractSpan createExitSpan(String operationName, String remotePeer);
AbstractSpan activeSpan(); // 获得当前活跃的 Span
boolean stopSpan(AbstractSpan span); // 停止指定 Span
// 当前 Span已经开启了异步状态,现在等待另一个线程关闭该 Span
AbstractTracerContext awaitFinishAsync();
void asyncStop(AsyncSpan span); // 关闭处于异步状态的 Span
}
TracingContext 是 AbstractTracerContext 实现类,其核心方法如下:
// 负责采样,后面会展开介绍采样逻辑
private SamplingService samplingService;
// 对应的 TraceSegment对象,在 TracingContext的构造方法中会创建该对象
private TraceSegment segment;
// 记录当前活跃的 Span
private LinkedList<AbstractSpan> activeSpanStack;
// Span编号自增序列。创建的 Span 的编号,通过该变量自增生成。
private int spanIdGenerator;
下面开始分析 TracingContext 中的核心方法,首先是 createEntrySpan() 方法,它负责创建 EntrySpan,如果已经存在父 Span,当然没法再创建 EntrySpan咯,重新调用一下 start()方法咯,╮(╯_╰)╭,大致实现如下( 其中省略了 DictionaryManager 的相关代码,后面再详细介绍这货 ):
public AbstractSpan createEntrySpan(final String operationName) {
if (isLimitMechanismWorking()) { // 默认,每个 TraceSegment只能放300个 Span,超过了就每30s打一条 WARN日志
NoopSpan span = new NoopSpan(); // 超过300就放 NoopSpan
return push(span);
}
AbstractSpan entrySpan;
final AbstractSpan parentSpan = peek(); // 获取 activeSpanStack集合中的最后一个 Span,就是父Span
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
if (parentSpan != null && parentSpan.isEntry()) {
// 更新 operationId(或operationName)
entrySpan = parentSpan.setOperationId(operationId);
// 重新调用 start()方法,前面提到过,start()方法会重置operationId(以及或operationName)之外的其他字段
return entrySpan.start();
} else {
// 新建 EntrySpan对象
entrySpan = new EntrySpan(spanIdGenerator++, parentSpanId, operationId);
// 调用 start()方法,第一次调用时会设置startTime
entrySpan.start();
// 将新建的 Span添加到 activeSpanStack集合尾部
return push(entrySpan);
}
}
接下来看 createLocalSpan()方法,就是创建个 LocalSpan对象,然后加到 activeSpanStack 集合中,大致实现如下:
public AbstractSpan createLocalSpan(final String operationName) {
// 检测当前 TraceSegment中的 Span个数(和上面的逻辑一样,略)
// 获取父 Span以及父 Span的Id
AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
// 直接创建 LocalSpan对象并调用其 start()方法
AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName);
span.start();
return push(span); // 记入 activeSpanStack集合
}
再往下自然就到了 createExitSpan() 方法,创建或是重用 ExitSpan,实现如下:
public AbstractSpan createExitSpan(final String operationName, final String remotePeer) {
AbstractSpan exitSpan;
AbstractSpan parentSpan = peek(); // 获取父 Span
if (parentSpan != null && parentSpan.isExit()) {
// 父 Span已经是 ExitSpan,就不再新建了
exitSpan = parentSpan;
} else {
// 父 Span不是 ExitSpan,就新建一个 ExitSpan
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
exitSpan = new ExitSpan(spanIdGenerator++, parentSpanId, operationId, peerId);
push(exitSpan); // 记入 activeSpanStack集合
}
exitSpan.start();// 调用 start()方法,第一次调用的时候,会设置 startTime
return exitSpan;
}
最后来看 stopSpan() 方法,它负责关闭指定的 Span对象:
public boolean stopSpan(AbstractSpan span) {
AbstractSpan lastSpan = peek(); // 获取当前活跃的 Span对象
if (lastSpan == span) { // 只能关闭当前活跃 Span对象,否则抛异常
if (lastSpan instanceof AbstractTracingSpan) {
AbstractTracingSpan toFinishSpan = (AbstractTracingSpan)lastSpan;
if (toFinishSpan.finish(segment)) { // 尝试关闭 Span
pop(); // 当 Span完全关闭之后,会将其从 activeSpanStack集合中删除
}
} else {
pop();
}
} else {
throw new IllegalStateException("Stopping the unexpected span = " + span);
}
// TraceSegment中全部 Span都关闭(且异步状态的 Span也关闭了),则当前 TraceSegment也会关闭
if (checkFinishConditions()) {
finish();
}
return activeSpanStack.isEmpty();
}
在 TracingContext.finish() 方法中会关闭关联的 TraceSegment 对象,完成采样操作,还会通知相关的 Listener:
private void finish() {
// 关闭当前 TraceSegment对象
TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
// 当没有父 TraceSegment的时候(表示当前链路不完整),而且该 TraceSegment只有一个 Span的时候
if (!segment.hasRef() && segment.isSingleSpanSegment()) {
if (!samplingService.trySampling()) { // 未开启采样
finishedSegment.setIgnore(true);
}
}
// 通知监听器,目前 Listener就是 TraceSegmentServiceClient
TracingContext.ListenerManager.notifyFinish(finishedSegment);
}
在开始介绍 inject() 方法和 extract() 方法之前,需要先介绍一下他们的参数—— ContextCarrier,它记录了当前 TracingContext的一些基本信息,并实现了 Serializable 接口哦,ContextCarrier 中的字段不再详细介绍,望文生义即可,嘎嘎嘎。
TracingContext.inject() 方法的功能就是将 TracingContext 对象填充到 ContextCarrier 中,看着很长,其实就是一些简单的字段,实现如下:
public void inject(ContextCarrier carrier) {
// 检测当前活跃的 Span是否为 ExitSpan,否则会抛出异常(略)
WithPeerInfo spanWithPeer = (WithPeerInfo)span;
String peer = spanWithPeer.getPeer();
int peerId = spanWithPeer.getPeerId();
// 设置 TraceSegmentId
carrier.setTraceSegmentId(this.segment.getTraceSegmentId());
carrier.setSpanId(span.getSpanId()); // 设置 SpanId
// 设置 service_instance_id
carrier.setParentServiceInstanceId(segment.getApplicationInstanceId());
// 设置 peerHost或是 peerId
if (DictionaryUtil.isNull(peerId)) {
carrier.setPeerHost(peer);
} else {
carrier.setPeerId(peerId);
}
List<TraceSegmentRef> refs = this.segment.getRefs();
int operationId;
String operationName;
int entryApplicationInstanceId;
if (refs != null && refs.size() > 0) {
TraceSegmentRef ref = refs.get(0);
operationId = ref.getEntryEndpointId();
operationName = ref.getEntryEndpointName();
entryApplicationInstanceId = ref.getEntryServiceInstanceId();
} else {
AbstractSpan firstSpan = first();
operationId = firstSpan.getOperationId();
operationName = firstSpan.getOperationName();
entryApplicationInstanceId = this.segment.getApplicationInstanceId();
}
// 设置 entryApplicationInstanceId
carrier.setEntryServiceInstanceId(entryApplicationInstanceId);
// 设置 operationName或 operationId
if (operationId == DictionaryUtil.nullValue()) {
if (!StringUtil.isEmpty(operationName)) {
carrier.setEntryEndpointName(operationName);
}
} else {
carrier.setEntryEndpointId(operationId);
}
// 设置 parentEndpointId或是 parentEndpointName
int parentOperationId = first().getOperationId();
if (parentOperationId == DictionaryUtil.nullValue()) {
carrier.setParentEndpointName(first().getOperationName());
} else {
carrier.setParentEndpointId(parentOperationId);
}
// 设置 primaryDistributedTraceId,只会从 relatedGlobalTraces集合中取第一个
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
}
在 TracingContext 的 extract() 方法就是将 ContextCarrier 中各个字段,解出来放到 TraceSegmentRef 的相应字段中:
public void extract(ContextCarrier carrier) {
// 将 ContextCarrier参数中相应字段填充到 TraceSegmentRef中的相应字段
TraceSegmentRef ref = new TraceSegmentRef(carrier);
this.segment.ref(ref); // 将 TraceSegmentRef记录到当前 TraceSegment中
this.segment.relatedGlobalTraces(carrier.getDistributedTraceId());
AbstractSpan span = this.activeSpan();
if (span instanceof EntrySpan) {
span.ref(ref); // 将上面的 TraceSegmentRef对象记录到 Span中
}
}
下面看一下 capture() 方法和 continued() 方法的参数—— ContextSnapshot,它与前面的 ContextCarrier 类似。因为只是跨线程传递,所以像 service_intance_id 这种字段就没必要传递了。 ca pture () 方法和 continued() 方法 的行为与 inject() 方法和 extract() 方法类似,这里不再展开分析了。
在前面介绍 ServiceManager SPI 加载 BootService 接口实现类的时候,我们看到有一个叫 ContextManager 的实现类,就是它来控制 Context 的。 ContextManager 中的 prepare()、boot()、onComplete() 方法都是空实现 。
ContextManager 的核心字段如下:
// TracingContext与一个 TraceSegment绑定,一个 TraceSegment与
// 一个线程关联,所有这里使用ThreadLocal
private static ThreadLocal<AbstractTracerContext> CONTEXT
= new ThreadLocal<AbstractTracerContext>();
// RuntimeContext 其实就是对 CurrentHashMap的封装
private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT
= new ThreadLocal<RuntimeContext>();
// ContextManagerExtendService也实现了BootService,
// 也是通过SPI方式加载的,它就一个功能,就是创建 Context
private static ContextManagerExtendService EXTEND_SERVICE;
在 ContextManager 提供的 createEntrySpan()、createExitSpan() 以及 createLocalSpan()等方法中,都会调用其 getOrCreate() 这个 static 方法,在该方法中负责创建 TracingContext,如下所示:
private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
AbstractTracerContext context = CONTEXT.get(); // 从 ThreadLocal里面拿 TracingContext
if (context == null) {
if (EXTEND_SERVICE == null) {
EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
}
// ContextManagerExtendService只用于创建Context
context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
CONTEXT.set(context); // 保存到 ThreadLocal中
}
return context;
}
之 后会 再调用 TracingContext 的相应方法,创建指定类型 的 Span。
ContextManager 中的其他方法都是先调用 get() 这个静态方法拿到当前线程的 TracingContext,然后调用 TracingContext 的相应方法实现的,不展开了╮(╯_╰)╭。
SamplingService
前面的分析中多次看到 SamplingService,它负责进行采样,其核心字段如下:
// 是否开启了采样
private volatile boolean on = false;
// 当前已经采样了多少 Trace
private volatile AtomicInteger samplingFactorHolder;
// 定时任务,该定时任务每三秒重置 samplingFactorHolder
private volatile ScheduledFuture<?> scheduledFuture;
在 SamplingService.boot()方法中会根据配置初始化上述字段:
public void boot() throws Throwable {
if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) { // 开启采样
on = true;
this.resetSamplingFactor(); // 重置 samplingFactorHolder
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor(); // 每3秒重置 samplingFactorHolder
}
}, 0, 3, TimeUnit.SECONDS);
}
}
在 SamplingService.trySampling()方法中会尝试增加 samplingFactorHolder 的值,当其值超过配置指定的值时,会返回false,表示该 Trace 未被采样到,具体实现如下:
public boolean trySampling() {
if (on) { // 开启了 Trace采样功能
int factor = samplingFactorHolder.get();
if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
return success; // 该 Trace被采样
} else {
return false; // 该 Trace未被采样
}
}
return true;
}
总结
好了,Context 相关的组件基本就介绍完了,其中包括 AbstractTracerContext 及其实现类、ContextManager ,主要提供了如下方法:
创建 EntrySpan、LocalSpan、ExitSpan 三类Span 的方法。
关闭 Span 的 stopSpan() 方法。
用于跨进程传播的 inject()、extract() 方法,以及涉及到的 ContextCarrier 组件
用于跨线程传播的 capture()、continue()方法,以及涉及到的 ContextSnapshot组件。
行吧,下一节看几个插件的实现,你说行不?
扫描下图二维码,关注【程序员吴小胖】
从底部 ”源码分析“菜单 即可获取
《Skywalking源码分析指北》全部文章哟~
看懂看不懂,都点个赞吧:+1: