前文回顾
在《 Skywalking 源码分析指北第五篇—— Agent 初始化 ( 中 ) 》中 深入分析了插件如何增强目标类的静态方法、构造方法、实例方法,如何让目标类实现指定接口,如何为目标类添加新字段等等,还通过 apm-mysql-5.x-plugin 这个插件将上述功能穿起来了。
本节将继续介绍 Skywalking Agent 的最后一步,其入口只有一行代码:
ServiceManager.INSTANCE.boot();
ServiceManager 是一个枚举(其实就是为了实现单例),然后通过 SPI 的方式加载 BootService 接口实现。
SPI基础
SPI 的全名为 Service Provider Interface,我们常说“面向接口编程”,为了在模块装配的时候不在程序里指明是哪个实现,就需要动态的查找接口实现。 JDK的SPI机制可以帮助我们查找某个接口的实现类。java.util.ServiceLoader 会去加载 META-INF/service/ 目录下的配置文件。
先来个例子吧:
public interface Log {
void exec();
}
public class Log4j implements Log {
@Override
public void exec() {
System.out.println("Log4j");
}
}
public class Logback implements Log {
@Override
public void exec() {
System.out.println("Logback");
}
}
在项目的resources/META-INF/services目录下添加一个名为 com.xxx.test.demo.Log 文件的文件:
最后是Main方法:
public class Main {
public static void main(String[] args) {
ServiceLoader<Log> serviceLoader = ServiceLoader.load(Log.class);
Iterator<Log> iterator = serviceLoader.iterator();
while (iterator.hasNext()) {
Log log = iterator.next();
log.exec();
}
}
}
JDK SPI 的核心是 ServiceLoader.load() 方法,我们先深入看一下他是怎么实现的。首先会获取当前ClassLoader:
public static <S> ServiceLoader<S> load(Class<S> service) {
// 获取当前ClassLoader
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}
在ServiceLoader.load()的最底层会调用 ServiceLoader. reload()方法,其中清理缓存并创建LazyIterator,LazyIterator是ServiceLoader的内部类:
// 缓存,用来缓存 ServiceLoader加载上来的实例
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
public void reload() {
providers.clear(); // 清空缓存
lookupIterator = new LazyIterator(service, loader);
}
我们在Main方法中使用的迭代器底层就是调用了ServiceLoader.LazyIterator实现的。 LazyIterator .next()方法最终调用的是nextService()方法,hasNext()方法最终调用的是hasNextService()方法,如图所示:
首先来看 LazyIterator. hasNextService()方法主要负责查找META-INF下的那个配置文件,并进行遍历,大致实现如下所示:
private static final String PREFIX = "META-INF/services/";
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
// 拼起来就是META-INF目录下定义的那个配置文件
String fullName = PREFIX + service.getName();
// 加载配置文件
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
// 解析配置文件
pending = parse(service, configs.nextElement());
}
nextName = pending.next(); // 更新nextName
return true;
}
再来看 LazyIterator.nextService()方法 ,它负责实例化指定的实现类,并将实例化的对象放到缓存中,大致具体实现如下所示:
private S nextService() {
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
// 加载nextName指定的类
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service, "Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) { // 检测类型
fail(service, "Provider " + cn + " not a subtype");
}
try {
S p = service.cast(c.newInstance()); // 创建实例
providers.put(cn, p); // 将实例添加到缓存
return p;
} catch (Throwable x) {
fail(service, "Provider " + cn + " could not be instantiated", x);
}
throw new Error(); // This cannot happen
}
最后再来看一下ServiceLoader.Iterator()方法返回的真正迭代器:
public Iterator<S> iterator() {
return new Iterator<S>() {
// knownProviders用来迭代providers缓存
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();
public boolean hasNext() {
// 先走缓存,缓存没有走LazyIterator
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}
public S next() {
// 先走缓存,缓存没有走LazyIterator
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
回到 Skywalking的 ServiceManager ,它是个枚举,其实就是为了单例。在它的 boot()方法中首先
void load(List<BootService> allServices) {
// 很明显,SPI 加载 META-INF/services下的全部 BootService接口实现
Iterator<BootService> iterator = ServiceLoader.load(BootService.class, AgentClassLoader.getDefault()).iterator();
while (iterator.hasNext()) {
allServices.add(iterator.next());
}
}
在 apm-agent-core 模块的 resource/META-INF.services/org.apache.skywalking.apm.agent.core.boot.BootService 文件中,记录了 SPI要加载的 BootService接口实现类。
org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
org.apache.skywalking.apm.agent.core.context.ContextManager
org.apache.skywalking.apm.agent.core.sampling.SamplingService
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
加载完上述 BootService 接口实现之后,开始针对 BootService上的 @DefaultImplementor和 @OverrideImplementor 注解进行处理:
如果类未被 @DefaultImplemento r 和 @OverrideImplementor 标注,则放入 bootServices 集合;如果 bootServices 集合中已有该类型的 BootService实现了,则抛出异常。
如果类被 @DefaultImplementor 标注,则记录到 bootServices 集合中;如果 bootServices 集合中已有该类型的 BootService实现了,则忽略当前类。
如果类被 @OverrideImplementor 标注,则尝试记录到 bootServices 集合中;如果 bootServices 集合中已有该类型的 BootService实现了,且已有类型是 @OverrideImplementor 注解指定的类,再来个并且,已有类型被 @DefaultImplementor 标注了,则当前类替换 bootServices 集合中已有类。
看着代码有点长(这里就不粘贴出来了,看源码嘛,自己不搭源码环境,还是去看综艺吧),其实就是 根据 @DefaultImplementor 和 @OverrideImplementor 注解决定某个类型的 BootService接口实现到底用哪个而已,easy,easy,I'll carry you !
确定完 BootService 使用哪个实现之后,就要开始进行一些初始化操作了。ServiceManager会逐个全部 BootService 实现的 prepare()、boot()、onComplete()方法:
prepare(); // 调用全部 BootService的prepare()方法
startup(); // 调用全部 BootService的boot()方法
onComplete(); // 调用全部 BootService的onComplete()方法
<strong>GRPCChannelManager</strong><strong style="letter-spacing:0.544px;font-size:18px;font-family:-apple-system-font, system-ui, "Helvetica Neue", "PingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;">实现</strong>
Skywalking Agent 毕竟不是单机的,与 Server 端交互是必须的,在一堆 BootService 实现中一眼就看到了 GRPCChannelManager ,来吧,看看它是怎么玩的。
这里首先说一下 gRPC 里面的两个组件:
一个是 ManagedChannelBuilder 。它负责 来创建客户端 Channel ,ManagedChannelBuilder 使用了 provider 机制,具体是创建了哪种 Channel 由 provider 决定,常用的 ManagedChannelBuilder 有两种:NettyChannelBuilder与OkHttpChannelBuilder。在 Skywalking Agent 中用的是 NettyChannelBuilder,底层是基于 Netty 实现的呗,这里不深入到 gRPC 的 Java 实现中去分析它是如何使用 Netty 的,Skywalking 源码指北的分析边界就到这里哈。
另一个是 ManagedChannel 。它是客户端最核心的类,它表示逻辑上的一个Channel,底层持有一个 TCP 链接,并负责维护此连接的活性,也就是说,在 RPC 调用的任何时机,如果检测到底层连接处于关闭状态(terminated),将会尝试重建连接。通常情况下,我们不需要在RPC调用结束后就关闭 Channel ,该 Channel 可以被一直重用,直到整个 Client 程序关闭。当然,为了提高 Client 的整体并发能力,我们自己创建多个 ManagedChannel ,然后再配合上连接池,在每次 RPC 请求时选择使用轮训或是随机等算法选择一个可用的 Channel 即可。
Skywalking 在 ManagedChannel 的基础上封装了自己的 GRPCChannel实现,可以添加一些修饰器,目前就只有一个验证权限的修饰器,没啥可说的。
回到 GRPCChannelManager,先说说核心字段吧:
// 封装了上面介绍的gRPC Channel
private volatile GRPCChannel managedChannel = null;
// 定时重连gRPC Server的定时任务
private volatile ScheduledFuture<?> connectCheckFuture;
// 是否重连。当Channel断开时会标记 reconnect为 true。后台线程会根据该标识进行连接(重连)
private volatile boolean reconnect = true;
// 加在 Channel上的监听器,主要是监听 Channel的状态变化
private List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<GRPCChannelListener>());
// gRPC Server集合
private volatile List<String> grpcServers;
GRPCChannelManager 的 prepare()和 onComplete()方法都是空实现,在 boot()方法中会初始化 connectCheckFuture 这个定时任务,初次会立即执行,之后每隔30s执行一次,具体执行的就是 GRPCChannelManager.run()方法,其核心就是创建连接:
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AuthenticationDecorator())
.build();
// notify()方法会循环调用 GRPCChannelListener.statusChanged()方法,
// 通知所有 GRPCChannelListener连接创建成功的事件
notify(GRPCChannelStatus.CONNECTED);
// 设置 reconnect字段为false,暂时不会再重建连接了
reconnect = false;
行了, GRPCChannelManager说完了,╮(╯_╰)╭
<strong style="letter-spacing:0.544px;font-size:18px;font-family:-apple-system-font, system-ui, "Helvetica Neue", "PingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;">注册+心跳</strong>
介绍完基本的连接方式,还需要了解 Agent和Server交互的流程和协议,本节在分析 gRPC 协议的同时,会穿插着相关的代码实现,一锅端了。
首先来看 gRPC 中 proto 的定义,Register.proto 文件的位置如下:
Register.proto 中定义了Register service,如下所示:
service Register {
# 参数 Services中要带上当前 Agent的 serviceName,还可以带一些 KV格式的附加信息
# 返回的 ServiceRegisterMapping其实是多个KV,其中就有服务端生成的 service_id
rpc doServiceRegister (Services) returns (ServiceRegisterMapping) {}
# 参数 ServiceInstances中要带上 service_id、时间戳等信息
# 返回的 ServiceInstanceRegisterMapping本质也是一堆 KV,其中包含服务端生成的 service_name_id
rpc doServiceInstanceRegister (ServiceInstances) returns (ServiceInstanceRegisterMapping) {}
# 还有三个方法,先省略一下,后面会介绍
}
这两个方法其实就是核心注册流程,相关实现是在 ServiceAndEndpointRegisterClient 这个 BootService 实现的。
首先在 ServiceAndEndpointRegisterClient.prepare()方法中,会将当前 ServiceAndEndpointRegisterClient注册到 Channel,同时还会生成 INSTANCE_UUID ,具体实现如下:
// 根据 BootService实现的类型,前面介绍了 ServiceManager.bootedServices
// 然后将 ServiceAndEndpointRegisterClient注册成 Listener
ServiceManager.INSTANCE.findService(GRPCChannelManager.class)
.addChannelListener(this);
// 生成 INSTANCE_UUID
INSTANCE_UUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString()
.replaceAll("-", "") : Config.Agent.INSTANCE_UUID;
在其 boot()方法中,也会启动一个定时任务,默认每3s执行一次注册上述注册操作,先走 doServiceRegister()方法,用 service_name 换 service_id,实现片段如下:
if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
// 调用doServiceRegister,参数是Services
ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.doServiceRegister(
Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
// 解析服务端的响应
for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
// 响应结果中,SERVICE_NAME对应的value是一个service_id
if (Config.Agent.SERVICE_NAME.equals(registered.getKey())) {
// 记录service_id
RemoteDownstreamConfig.Agent.SERVICE_ID = registered.getValue();
shouldTry = true;
}
}
}
之后走一下 doServiceInstanceRegister ()方法,用 service_id 换 service_instance_id,实现片段如下:
if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
// 调用 doServiceInstanceRegister()方法,用SERVICE_ID换SERVICE_INSTANCE_ID
ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.doServiceInstanceRegister(ServiceInstances.newBuilder()
.addInstances(ServiceInstance.newBuilder()
.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)
// 除了service_id,还会传递uuid、时间戳以及系统信息之类的
.setInstanceUUID(INSTANCE_UUID)
.setTime(System.currentTimeMillis())
.addAllProperties(OSUtil.buildOSInfo())
).build());
for (KeyIntValuePair serviceInstance : instanceMapping.getServiceInstancesList()) {
if (INSTANCE_UUID.equals(serviceInstance.getKey())) {
// 记录service_intance_id
RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = serviceInstance.getValue();
}
}
}
在完成上述两步注册操作之后,就开始发心跳了,╮(╯_╰)╭,官方的文档把周期发心跳这件事情认为是单独的一部分,这里因为都是在 ServiceAndEndpointRegisterClient 中的这个定时任务中实现的,就一并说了,也不复杂。
心跳涉及到一个新的 RPC Service 定义如下:
service ServiceInstancePing {
rpc doPing (ServiceInstancePingPkg) returns (Commands) {}
}
ServiceAndEndpointRegisterClient 中相关的代码实现片段如下,发出来的心跳带了service_instance_id、时间戳、instance_uuid 三个信息:
serviceInstancePingStub.doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID)
.build());
注册之后就可以开始上报 Trace 以及JVM监控了。
<strong style="max-width: 100%;box-sizing: border-box !important;overflow-wrap: break-word !important;">总结</strong>
介绍了 SPI 基础、ServiceManager 、gRPC 中的两个基础组件、GRPCChannelManager 实现、注册+心跳相关的 BootService实现。 行吧,这一节挺长了,剩下的,后面再说吧。
扫描下图二维码,关注【程序员吴小胖】
从底部 ”源码分析“菜单 即可获取
《Skywalking源码分析指北》全部文章哟~
看懂看不懂,都点个赞吧:+1: