本文主要研究一下skywalking的ServiceResetCommand
skywalking-6.6.0/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ServiceResetCommand.java
public class ServiceResetCommand extends BaseCommand implements Serializable, Deserializable<ServiceResetCommand> { public static final Deserializable<ServiceResetCommand> DESERIALIZER = new ServiceResetCommand(""); public static final String NAME = "ServiceMetadataReset"; public ServiceResetCommand(String serialNumber) { super(NAME, serialNumber); } @Override public Command.Builder serialize() { return commandBuilder(); } @Override public ServiceResetCommand deserialize(Command command) { final List<KeyStringValuePair> argsList = command.getArgsList(); String serialNumber = null; for (final KeyStringValuePair pair : argsList) { if ("SerialNumber".equals(pair.getKey())) { serialNumber = pair.getValue(); break; } } return new ServiceResetCommand(serialNumber); } }
skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/register/InstancePing.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.apache.skywalking.apm.network.register.v2"; option csharp_namespace = "SkyWalking.NetworkProtocol"; import "common/common.proto"; service ServiceInstancePing { rpc doPing (ServiceInstancePingPkg) returns (Commands) { } } message ServiceInstancePingPkg { int32 serviceInstanceId = 1; int64 time = 2; string serviceInstanceUUID = 3; }
skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/ServiceInstancePingServiceHandler.java
public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.ServiceInstancePingImplBase implements GRPCHandler { private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingServiceHandler.class); private final ServiceInstanceInventoryCache serviceInstanceInventoryCache; private final IServiceInventoryRegister serviceInventoryRegister; private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister; private final CommandService commandService; public ServiceInstancePingServiceHandler(ModuleManager moduleManager) { this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class); this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class); this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class); this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class); } @Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) { int serviceInstanceId = request.getServiceInstanceId(); long heartBeatTime = request.getTime(); serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime); ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId); if (Objects.nonNull(serviceInstanceInventory)) { serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime); responseObserver.onNext(Commands.getDefaultInstance()); } else { logger.warn("Can't find service by service instance id from cache," + " service instance id is: {}, will send a reset command to agent side", serviceInstanceId); final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID()); final Command command = resetCommand.serialize().build(); final Commands nextCommands = Commands.newBuilder().addCommands(command).build(); responseObserver.onNext(nextCommands); } responseObserver.onCompleted(); } }
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ServiceResetCommandExecutor.java
public class ServiceResetCommandExecutor implements CommandExecutor { private static final ILog LOGGER = LogManager.getLogger(ServiceResetCommandExecutor.class); @Override public void execute(final BaseCommand command) throws CommandExecutionException { LOGGER.warn("Received ServiceResetCommand, a re-register task is scheduled."); ServiceManager.INSTANCE.findService(ServiceAndEndpointRegisterClient.class).coolDown(); RemoteDownstreamConfig.Agent.SERVICE_ID = DictionaryUtil.nullValue(); RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = DictionaryUtil.nullValue(); RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = DictionaryUtil.nullValue(); NetworkAddressDictionary.INSTANCE.clear(); EndpointNameDictionary.INSTANCE.clear(); } }
ServiceInstancePingServiceHandler继承了ServiceInstancePingGrpc.ServiceInstancePingImplBase,实现了GRPCHandler接口;其构造器获取serviceInstanceInventoryCache、serviceInventoryRegister、serviceInstanceInventoryRegister、commandService;其doPing方法执行serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime),若serviceInstanceInventoryCache.get(serviceInstanceId)为nul则给agent发送ServiceResetCommand