转载

Skywalking源码分析指北第六篇——Agent初始化(下)

Skywalking源码分析指北第六篇——Agent初始化(下) 前文回顾  Skywalking源码分析指北第六篇——Agent初始化(下)

在《 Skywalking 源码分析指北第五篇—— Agent 初始化 ( ) 》中 深入分析了插件如何增强目标类的静态方法、构造方法、实例方法,如何让目标类实现指定接口,如何为目标类添加新字段等等,还通过 apm-mysql-5.x-plugin 这个插件将上述功能穿起来了。

本节将继续介绍 Skywalking Agent 的最后一步,其入口只有一行代码:

ServiceManager.INSTANCE.boot();

ServiceManager 是一个枚举(其实就是为了实现单例),然后通过 SPI 的方式加载 BootService 接口实现。

Skywalking源码分析指北第六篇——Agent初始化(下)

SPI基础

Skywalking源码分析指北第六篇——Agent初始化(下)

SPI 的全名为 Service Provider Interface,我们常说“面向接口编程”,为了在模块装配的时候不在程序里指明是哪个实现,就需要动态的查找接口实现。 JDK的SPI机制可以帮助我们查找某个接口的实现类。java.util.ServiceLoader 会去加载 META-INF/service/ 目录下的配置文件。

先来个例子吧:

public interface Log {

void exec();

}


public class Log4j implements Log {

@Override

public void exec() {

System.out.println("Log4j");

}

}


public class Logback implements Log {

@Override

public void exec() {

System.out.println("Logback");

}

}

在项目的resources/META-INF/services目录下添加一个名为 com.xxx.test.demo.Log 文件的文件:

Skywalking源码分析指北第六篇——Agent初始化(下)

最后是Main方法:

public class Main {

public static void main(String[] args) {

ServiceLoader<Log> serviceLoader = ServiceLoader.load(Log.class);

Iterator<Log> iterator = serviceLoader.iterator();

while (iterator.hasNext()) {

Log log = iterator.next();

log.exec();

}

}

}

JDK SPI 的核心是  ServiceLoader.load() 方法,我们先深入看一下他是怎么实现的。首先会获取当前ClassLoader:

public static <S> ServiceLoader<S> load(Class<S> service) {

// 获取当前ClassLoader

ClassLoader cl = Thread.currentThread().getContextClassLoader();

return ServiceLoader.load(service, cl);

}

在ServiceLoader.load()的最底层会调用  ServiceLoader. reload()方法,其中清理缓存并创建LazyIterator,LazyIterator是ServiceLoader的内部类:

// 缓存,用来缓存 ServiceLoader加载上来的实例

private LinkedHashMap<String,S> providers = new LinkedHashMap<>();


public void reload() {

providers.clear(); // 清空缓存

lookupIterator = new LazyIterator(service, loader);

}

我们在Main方法中使用的迭代器底层就是调用了ServiceLoader.LazyIterator实现的。 LazyIterator .next()方法最终调用的是nextService()方法,hasNext()方法最终调用的是hasNextService()方法,如图所示:

Skywalking源码分析指北第六篇——Agent初始化(下)

首先来看 LazyIterator. hasNextService()方法主要负责查找META-INF下的那个配置文件,并进行遍历,大致实现如下所示:

private static final String PREFIX = "META-INF/services/";


private boolean hasNextService() {

if (nextName != null) {

return true;

}

if (configs == null) {

// 拼起来就是META-INF目录下定义的那个配置文件

String fullName = PREFIX + service.getName();

// 加载配置文件

if (loader == null)

configs = ClassLoader.getSystemResources(fullName);

else

configs = loader.getResources(fullName);

}

while ((pending == null) || !pending.hasNext()) {

if (!configs.hasMoreElements()) {

return false;

}

// 解析配置文件

pending = parse(service, configs.nextElement());

}

nextName = pending.next(); // 更新nextName

return true;

}

再来看 LazyIterator.nextService()方法 ,它负责实例化指定的实现类,并将实例化的对象放到缓存中,大致具体实现如下所示:

private S nextService() {

String cn = nextName;

nextName = null;

Class<?> c = null;

try {

// 加载nextName指定的类

c = Class.forName(cn, false, loader);

} catch (ClassNotFoundException x) {

fail(service, "Provider " + cn + " not found");

}

if (!service.isAssignableFrom(c)) { // 检测类型

fail(service, "Provider " + cn + " not a subtype");

}

try {

S p = service.cast(c.newInstance()); // 创建实例

providers.put(cn, p); // 将实例添加到缓存

return p;

} catch (Throwable x) {

fail(service, "Provider " + cn + " could not be instantiated", x);

}

throw new Error(); // This cannot happen

}

最后再来看一下ServiceLoader.Iterator()方法返回的真正迭代器:

public Iterator<S> iterator() {

return new Iterator<S>() {

// knownProviders用来迭代providers缓存

Iterator<Map.Entry<String,S>> knownProviders

= providers.entrySet().iterator();


public boolean hasNext() {

// 先走缓存,缓存没有走LazyIterator

if (knownProviders.hasNext())

return true;

return lookupIterator.hasNext();

}


public S next() {

// 先走缓存,缓存没有走LazyIterator

if (knownProviders.hasNext())

return knownProviders.next().getValue();

return lookupIterator.next();

}


public void remove() {

throw new UnsupportedOperationException();

}

};

}

Skywalking源码分析指北第六篇——Agent初始化(下)

ServiceManager

Skywalking源码分析指北第六篇——Agent初始化(下)

回到 Skywalking的 ServiceManager ,它是个枚举,其实就是为了单例。在它的 boot()方法中首先

void load(List<BootService> allServices) {

// 很明显,SPI 加载 META-INF/services下的全部 BootService接口实现

Iterator<BootService> iterator = ServiceLoader.load(BootService.class, AgentClassLoader.getDefault()).iterator();

while (iterator.hasNext()) {

allServices.add(iterator.next());

}

}

在 apm-agent-core 模块的 resource/META-INF.services/org.apache.skywalking.apm.agent.core.boot.BootService 文件中,记录了 SPI要加载的 BootService接口实现类。

org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient

org.apache.skywalking.apm.agent.core.context.ContextManager

org.apache.skywalking.apm.agent.core.sampling.SamplingService

org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager

org.apache.skywalking.apm.agent.core.jvm.JVMService

org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient

org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService

加载完上述 BootService 接口实现之后,开始针对 BootService上的  @DefaultImplementor和 @OverrideImplementor 注解进行处理:

  • 如果类未被  @DefaultImplemento r 和   @OverrideImplementor 标注,则放入  bootServices 集合;如果 bootServices 集合中已有该类型的 BootService实现了,则抛出异常。

  • 如果类被 @DefaultImplementor 标注,则记录到  bootServices 集合中;如果 bootServices 集合中已有该类型的 BootService实现了,则忽略当前类。

  • 如果类被  @OverrideImplementor 标注,则尝试记录到  bootServices 集合中;如果 bootServices 集合中已有该类型的 BootService实现了,且已有类型是  @OverrideImplementor 注解指定的类,再来个并且,已有类型被  @DefaultImplementor 标注了,则当前类替换 bootServices 集合中已有类。

看着代码有点长(这里就不粘贴出来了,看源码嘛,自己不搭源码环境,还是去看综艺吧),其实就是 根据  @DefaultImplementor@OverrideImplementor 注解决定某个类型的 BootService接口实现到底用哪个而已,easy,easy,I'll carry you ! 

确定完 BootService 使用哪个实现之后,就要开始进行一些初始化操作了。ServiceManager会逐个全部 BootService 实现的 prepare()、boot()、onComplete()方法:

prepare(); // 调用全部 BootService的prepare()方法

startup(); // 调用全部 BootService的boot()方法

onComplete(); // 调用全部 BootService的onComplete()方法

Skywalking源码分析指北第六篇——Agent初始化(下)
<strong>GRPCChannelManager</strong><strong style="letter-spacing:0.544px;font-size:18px;font-family:-apple-system-font, system-ui, "Helvetica Neue", "PingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;">实现</strong>
Skywalking源码分析指北第六篇——Agent初始化(下)

Skywalking Agent 毕竟不是单机的,与 Server 端交互是必须的,在一堆 BootService 实现中一眼就看到了 GRPCChannelManager ,来吧,看看它是怎么玩的。

这里首先说一下 gRPC 里面的两个组件:

  • 一个是  ManagedChannelBuilder 。它负责 来创建客户端 Channel ,ManagedChannelBuilder 使用了 provider 机制,具体是创建了哪种 Channel 由 provider 决定,常用的 ManagedChannelBuilder 有两种:NettyChannelBuilder与OkHttpChannelBuilder。在 Skywalking Agent 中用的是 NettyChannelBuilder,底层是基于 Netty 实现的呗,这里不深入到 gRPC 的 Java 实现中去分析它是如何使用 Netty 的,Skywalking 源码指北的分析边界就到这里哈。

  • 另一个是 ManagedChannel 。它是客户端最核心的类,它表示逻辑上的一个Channel,底层持有一个 TCP 链接,并负责维护此连接的活性,也就是说,在 RPC 调用的任何时机,如果检测到底层连接处于关闭状态(terminated),将会尝试重建连接。通常情况下,我们不需要在RPC调用结束后就关闭 Channel ,该 Channel 可以被一直重用,直到整个 Client 程序关闭。当然,为了提高 Client 的整体并发能力,我们自己创建多个 ManagedChannel ,然后再配合上连接池,在每次 RPC 请求时选择使用轮训或是随机等算法选择一个可用的 Channel 即可。

Skywalking 在 ManagedChannel 的基础上封装了自己的 GRPCChannel实现,可以添加一些修饰器,目前就只有一个验证权限的修饰器,没啥可说的。

回到 GRPCChannelManager,先说说核心字段吧:

// 封装了上面介绍的gRPC Channel

private volatile GRPCChannel managedChannel = null;

// 定时重连gRPC Server的定时任务

private volatile ScheduledFuture<?> connectCheckFuture;

// 是否重连。当Channel断开时会标记 reconnect为 true。后台线程会根据该标识进行连接(重连)

private volatile boolean reconnect = true;

// 加在 Channel上的监听器,主要是监听 Channel的状态变化

private List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<GRPCChannelListener>());

// gRPC Server集合

private volatile List<String> grpcServers;

GRPCChannelManager 的 prepare()和 onComplete()方法都是空实现,在 boot()方法中会初始化  connectCheckFuture 这个定时任务,初次会立即执行,之后每隔30s执行一次,具体执行的就是 GRPCChannelManager.run()方法,其核心就是创建连接:

managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))

.addManagedChannelBuilder(new StandardChannelBuilder())

.addManagedChannelBuilder(new TLSChannelBuilder())

.addChannelDecorator(new AuthenticationDecorator())

.build();

// notify()方法会循环调用 GRPCChannelListener.statusChanged()方法,

// 通知所有 GRPCChannelListener连接创建成功的事件

notify(GRPCChannelStatus.CONNECTED);

// 设置 reconnect字段为false,暂时不会再重建连接了

reconnect = false;

行了, GRPCChannelManager说完了,╮(╯_╰)╭

Skywalking源码分析指北第六篇——Agent初始化(下)
<strong style="letter-spacing:0.544px;font-size:18px;font-family:-apple-system-font, system-ui, "Helvetica Neue", "PingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;">注册+心跳</strong>
Skywalking源码分析指北第六篇——Agent初始化(下)

Skywalking源码分析指北第六篇——Agent初始化(下)

介绍完基本的连接方式,还需要了解 Agent和Server交互的流程和协议,本节在分析 gRPC 协议的同时,会穿插着相关的代码实现,一锅端了。

首先来看 gRPC 中 proto 的定义,Register.proto 文件的位置如下:

Skywalking源码分析指北第六篇——Agent初始化(下)

Register.proto 中定义了Register service,如下所示:

service Register {

# 参数 Services中要带上当前 Agent的 serviceName,还可以带一些 KV格式的附加信息

# 返回的 ServiceRegisterMapping其实是多个KV,其中就有服务端生成的 service_id

rpc doServiceRegister (Services) returns (ServiceRegisterMapping) {}


# 参数 ServiceInstances中要带上 service_id、时间戳等信息

# 返回的 ServiceInstanceRegisterMapping本质也是一堆 KV,其中包含服务端生成的 service_name_id

rpc doServiceInstanceRegister (ServiceInstances) returns (ServiceInstanceRegisterMapping) {}

# 还有三个方法,先省略一下,后面会介绍

}

这两个方法其实就是核心注册流程,相关实现是在   ServiceAndEndpointRegisterClient 这个 BootService 实现的。

首先在  ServiceAndEndpointRegisterClient.prepare()方法中,会将当前  ServiceAndEndpointRegisterClient注册到 Channel,同时还会生成 INSTANCE_UUID ,具体实现如下:

// 根据 BootService实现的类型,前面介绍了 ServiceManager.bootedServices

// 然后将 ServiceAndEndpointRegisterClient注册成 Listener

ServiceManager.INSTANCE.findService(GRPCChannelManager.class)

.addChannelListener(this);

// 生成 INSTANCE_UUID

INSTANCE_UUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString()

.replaceAll("-", "") : Config.Agent.INSTANCE_UUID;

在其 boot()方法中,也会启动一个定时任务,默认每3s执行一次注册上述注册操作,先走 doServiceRegister()方法,用 service_name 换 service_id,实现片段如下:

if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {

// 调用doServiceRegister,参数是Services

ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.doServiceRegister(

Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());

// 解析服务端的响应

for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {

// 响应结果中,SERVICE_NAME对应的value是一个service_id

if (Config.Agent.SERVICE_NAME.equals(registered.getKey())) {

// 记录service_id

RemoteDownstreamConfig.Agent.SERVICE_ID = registered.getValue();

shouldTry = true;

}

}

}

之后走一下  doServiceInstanceRegister ()方法,用 service_id 换 service_instance_id,实现片段如下:

if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {

// 调用 doServiceInstanceRegister()方法,用SERVICE_ID换SERVICE_INSTANCE_ID

ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.doServiceInstanceRegister(ServiceInstances.newBuilder()

.addInstances(ServiceInstance.newBuilder()

.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)

// 除了service_id,还会传递uuid、时间戳以及系统信息之类的

.setInstanceUUID(INSTANCE_UUID)

.setTime(System.currentTimeMillis())

.addAllProperties(OSUtil.buildOSInfo())

).build());

for (KeyIntValuePair serviceInstance : instanceMapping.getServiceInstancesList()) {

if (INSTANCE_UUID.equals(serviceInstance.getKey())) {

// 记录service_intance_id

RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = serviceInstance.getValue();

}

}

}

在完成上述两步注册操作之后,就开始发心跳了,╮(╯_╰)╭,官方的文档把周期发心跳这件事情认为是单独的一部分,这里因为都是在  ServiceAndEndpointRegisterClient 中的这个定时任务中实现的,就一并说了,也不复杂。

心跳涉及到一个新的 RPC Service 定义如下:

service ServiceInstancePing {

rpc doPing (ServiceInstancePingPkg) returns (Commands) {}

}

ServiceAndEndpointRegisterClient 中相关的代码实现片段如下,发出来的心跳带了service_instance_id、时间戳、instance_uuid 三个信息:

serviceInstancePingStub.doPing(ServiceInstancePingPkg.newBuilder()

.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)

.setTime(System.currentTimeMillis())

.setServiceInstanceUUID(INSTANCE_UUID)

.build());

注册之后就可以开始上报 Trace 以及JVM监控了。

Skywalking源码分析指北第六篇——Agent初始化(下)
<strong style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;">总结</strong>
Skywalking源码分析指北第六篇——Agent初始化(下)

介绍了 SPI 基础、ServiceManager 、gRPC 中的两个基础组件、GRPCChannelManager 实现、注册+心跳相关的 BootService实现。 行吧,这一节挺长了,剩下的,后面再说吧。

扫描下图二维码,关注【程序员吴小胖】

从底部 ”源码分析“菜单 即可获取

《Skywalking源码分析指北》全部文章哟~

Skywalking源码分析指北第六篇——Agent初始化(下)

看懂看不懂,都点个赞吧:+1:

原文  http://mp.weixin.qq.com/s?__biz=MzU5Mjc5OTY5Ng==&mid=2247484551&idx=2&sn=a589c1585cfaa74ab3809bf37afee7e3
正文到此结束
Loading...