在全链路跟踪框架中,Trace信息的传递功能是基于ThreadLocal的。但实际业务中可能会使用异步调用,这样就会丢失Trace信息,破坏了链路的完整性。
在同一线程中trace信息的传递流程使用代码模拟如下:
ThreadLocal<String> traceContext = new ThreadLocal<>(); String traceId = Tracer.startServer(); traceContext.set(traceId) //生成trace信息 传入threadlocal ... Tracer.startClient(traceContext.get()); //从threadlocal获取trace信息 Tracer.endClient(); ... Tracer.endServer();
那么显然如果是异步线程的话,下一个Span拿不到上一个Span的trace信息,就会造成调用链跟踪断了。那么怎么才能在异步的情况下传递ThreadLocal对象呢。
如果仅仅是父子之间传递ThreadLocal对象的话,JDK自身就有实现InheritableThreadLocal。
Thread内部为InheritableThreadLocal开辟了一个单独的ThreadLocalMap。在父线程创建一个子线程的时候,会检查这个ThreadLocalMap是否为空,不为空则会浅拷贝给子线程的ThreadLocalMap。
Thread的init相关逻辑如下:
if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
赋值拷贝代码如下:
private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } }
需要注意的是,拷贝为浅拷贝。父子线程的 ThreadLocalMap 内的 key 都指向同一个 InheritableThreadLocal 对象,Value 也指向同一个 Value。子线程的Value更改可以覆盖父线程的Value。
这样一来InheritableThreadLocal让我们可以在父线程创建子线程的时候将ThreadLocal中的值传递给子线程。但在大部分场景下,业务应用都会使用线程池。而在这种复用线程的池化场景中,线程池中的线程和主线程却都不是父子线程的关系,并不能直接使用InheritableThreadLocal。
Transmittable ThreadLocal是阿里开源的库,继承了InheritableThreadLocal,优化了在使用线程池等会池化复用线程的情况下传递ThreadLocal的使用。
简单来说,有个专门的TtlRunnable和TtlCallable包装类,用于读取原Thread的ThreadLocal对象及值并存于Runnable/Callable中,在执行run或者call方法的时候再将存于Runnable/Callable中的ThreadLocal对象和值读取出来,存入调用run或者call的线程中。
以TtlRunnable为例,构造函数如下:
private final AtomicReference<Object> capturedRef; private final Runnable runnable; private final boolean releaseTtlValueReferenceAfterRun; private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { //从父类capture复制到本类 this.capturedRef = new AtomicReference<>(capture()); this.runnable = runnable; //提交的runnable对象 this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; }
capture函数的复制过程如下:
@Nonnull public static Object capture() { Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>(); for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { captured.put(threadLocal, threadLocal.copyValue()); } return captured; }
其中holder记录了当前 Thread 绑定了哪些 TransmittableThreadLocal 对象。captured保存了父线程ThreadLocal的值。
接着任务提交到线程池,线程开始运行时,取出保存在captured中的父线程ThreadLocal值并重新set。即将父线程值传递到了任务执行时。
@Override public void run() { Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } Object backup = replay(captured); try { runnable.run(); } finally { restore(backup); } }
这样TransmittableThreadLocal就解决了在线程池场景下的ThreadLocal对象传递。整个流程图如下:
有了TransmittableThreadLocal作为基础,调用链跨线程传递trace信息也不再困难,只需将trace信息均存于TransmittableThreadLocal中,使用异步线程池时使用Ttl相关类修饰即可。模拟代码如下:
public void testAsync() { ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(executorService); String traceId = Tracer.startServer(); //父线程的traceId ThreadLocal<String> traceContext = new TransmittableThreadLocal<>(); traceContext.set(traceId); //存入TransmittableThreadLocal ttlExecutorService.submit(new Runnable() { @Override public void run() { //runnable执行中获取当前线程的traceId与父线程的traceId一致 String childTraceId = traceContext.get(); Assert.assertEquals(childTraceId, traceId); Tracer.startClient(traceId); Tracer.endClient(); } }); Tracer.endServer(); }
以上所有使用需要业务代码去改动自己的线程池类,runnable或者callable类。而使用Java Agent实现线程池的传递是透明的,可以做到应用代码无侵入。
Java Agent(Instrumentation)是JDK1.5引入的技术,基于JVM TI机制,使得开发者可以构建一个独立于应用程序的代理(Agent),用来监测和协助运行在 JVM 上的程序,以及替换和修改某些类的定义。开发者可以在一个普通 Java 程序运行时,通过 – javaagent 参数指定一个特定的 jar 文件(包含 Instrumentation 代理)来启动相应的代理程序,植入自己扩展的修饰代码以实现功能。
在TransmittableThreadLocal中,相关Agent的源码分析如下:
//需要通过agent插入Executor类中的某个方法 private static void updateMethodOfExecutorClass(final CtClass clazz, final CtMethod method) throws NotFoundException, CannotCompileException { if (method.getDeclaringClass() != clazz) { return; } //插入的方法需要Public并且非静态 final int modifiers = method.getModifiers(); if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) { return; } //获取该方法的参数类型存入parameterTypes数组中 CtClass[] parameterTypes = method.getParameterTypes(); StringBuilder insertCode = new StringBuilder(); //根据参数类型顺序,进行代码格式化插入 for (int i = 0; i < parameterTypes.length; i++) { CtClass paraType = parameterTypes[i]; //区分Runnable/Callable if (RUNNABLE_CLASS_NAME.equals(paraType.getName())) { String code = String.format("$%d = %s.get($%d, false, true);", i + 1, TTL_RUNNABLE_CLASS_NAME, i + 1); logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code); insertCode.append(code); } else if (CALLABLE_CLASS_NAME.equals(paraType.getName())) { String code = String.format("$%d = %s.get($%d, false, true);", i + 1, TTL_CALLABLE_CLASS_NAME, i + 1); logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code); insertCode.append(code); } } //调用insertBefore()完成代码插入 if (insertCode.length() > 0) { method.insertBefore(insertCode.toString()); } }
将封装好的TransmittableThreadLocal Jar包放在类目录下的某个文件夹下,例如agent,那么只需在启动参数加入: -javaagent:agent/transmittable-thread-local-xxx.jar
即可完成修饰代码的植入。