继上一篇ElasticAPM初体验我们知道了什么是 可观察性 ,并领略了 ElasticAPM 的强大功能,但是仅仅是上篇文章中单机模式的使用时远远不够的。还记得上一篇最后提出的两个问题:
1、本文在单机版的环境中,测试通过,但是在分布式环境中,请求会串联起很多应用,那服务跟踪能否实现?实现的原理是什么?
2、Elastic APM可以自动采集http请求,在PRC分布式环境中,Elastic APM能否正常工作?是否必须采用 public API
来实现?
重点是 分布式 和 RPC ,即在分布式情况下,ElasticAPM能否良好工作?在RPC环境下,ElasticAPM是不是也能正常工作呢?
先说答案: 在默认情况下,ElasticAPM能够支持分布式的Http方式调用,但是不支持RPC协议 。但是很多公司都采用RPC协议作为其内部系统的通信协议,比如我司就采用Spring Cloud Alibaba作为搜索服务的框架,框架内应用的通信是借助RPC框架Dubbo来实现的。所以问题就变成了如何把 ElasticAPM 集成进Spring Cloud Alibaba中。
首先,我先大概图示下Spring Cloud Alibaba和ElasticAPM的架构和工作流程。
如架构图所示,搜索系统分为了 网关应用(Gateway) , US应用 , AS应用 , BS应用 ,用户的请求会先到达网关,网关会把请求,以Http协议转发给US应用,US应用会采用Dubbo协议调用AS应用,AS应用采用Dubbo协议调用BS应用。
Request--- http ---> US --- RPC ----> AS ----- RPC ------> BS
每一个应用启动的时候都已经集成了Apm-agent(如果不知道怎么集成请参考ElasticAPM初体验),如果APM-agent默认支持 Dubbo 就完美了(但是并没有)。所以整个链路追踪,到了US之后,就没有上报之后应用的锚点数据。在查看ElasticAPM官方文档的时候,我注意到了 Public API ,文档中交代了这样一件事情:
The public API of the Elastic APM Java agent lets you customize and manually create spans and transactions, as well as track errors.
没错,你可以自定义 Span
和 Transaction
,如果不懂什么是 Span
和 Transaction
请参考ElasticAPM初体验或直接读一遍 官方文档 。既然Agent默认不支持Dubbo,那么我们使用Public API来实现功能。
基于Spring Cloud Alibaba的架构,我们可以如下图方式实现。
Transaction
。 http
请求,如果是用SpringMVC实现的话,需要在 Controller
处,上报 子Transaction
。 Dubbo
协议的。这时可以采用Dubbo的过滤器机制,对 Concumer
和 Provider
都进行拦截,通过这种方式做到不侵入业务代码。 transaction.end()
上报根 Transaction
。 微服务网关需要做这样几件事情:
Transaction
POST
请求body中增加追踪ID, GET
请求 Parameter
中增加追踪ID transaction.end()
完成上报 public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { HttpMethod httpMethod = exchange.getRequest().getMethod(); //第一步 开启一个Transaction Transaction transaction = ElasticApm.startTransaction(); transaction.setName("mainSearch"); transaction.setType(Transaction.TYPE_REQUEST); //第二步 创建Span Span span = transaction.startSpan("gateway", "filter", "gateway action"); span.setName("com.mfw.search.gateway.filter.PostBodyCacheFilter#filter"); LOGGER.info("APM埋点成功transactionId:{}", transaction.getId()); //第三步 判定Http请求是POST还是GET if (HttpMethod.POST.equals(httpMethod)) { ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders); MediaType mediaType = exchange.getRequest().getHeaders().getContentType(); //第四步 定义Http body的处理逻辑 Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> { //判定body类型 if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) { //重要!获取到了body的数据,传给callback函数,做业务逻辑处理 Map<String, String> bodyMap = decodeBody(body); //设置最新的bodyMap进入exchange exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap); //重点!动态增加body的transaction标记,为下游应用Controller使用 span.injectTraceHeaders((name, value) -> { bodyMap.put(name, value); LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value); }); //不要忘记span.end()否则会丢失上报 span.end(); return Mono.just(encodeBody(bodyMap)); } else if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { // origin body map Map<String, String> bodyMap = decodeJsonBody(body); exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap); //重点!动态增加body的transaction标记,为下游应用Controller使用 span.injectTraceHeaders((name, value) -> { bodyMap.put(name, value); LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value); }); span.end(); return Mono.just(encodeJsonBody(bodyMap)); } return Mono.empty(); }); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); // the new content type will be computed by bodyInserter // and then set in the request decorator headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) { public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super.getHeaders()); if (contentLength > 0) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); } return httpHeaders; } public Flux<DataBuffer> getBody() { return outputMessage.getBody(); } }; //第五步 在请求返回之后,上报transaction return chain.filter(exchange.mutate().request(decorator).build()).then(Mono.fromRunnable(() -> transaction.end())); })); } else if (HttpMethod.GET.equals(httpMethod)) { span.injectTraceHeaders((name, value) -> { exchange.getRequest().getQueryParams().set(name, transaction.getId()); LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value); }); return chain.filter(exchange).then(Mono.fromRunnable(() -> { span.end(); transaction.end(); LOGGER.info("APM买点完成,transactionId:{}", transaction.getId()); })); } else { //not support other Http Method exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE); return exchange.getResponse().setComplete(); } } 复制代码
Controller层的实现采用了SpringAOP方式实现,这样的好处是对业务代码不侵入,可扩展性高,对想要监控的方法直接配置上 @TransactionWithRemoteParent()
即可。
如下代码是通过 @TransactionWithRemoteParent()
实现对Controller方法的上报。
@PostMapping(value = "/search", consumes = "application/json", produces = "application/json") @TransactionWithRemoteParent() public String searchForm(@RequestBody String req) { String result = asService.helloAs(req); return result; } 复制代码
AOP实现
@Aspect public class ApmAspect { private static final Logger LOGGER = LoggerFactory.getLogger(ApmAspect.class); @PostConstruct private void init() { LOGGER.info("ApmAspect加载完毕"); } @Pointcut(value = "@annotation(transactionWithRemoteParent)", argNames = "transactionWithRemoteParent") public void pointcut(TransactionWithRemoteParent transactionWithRemoteParent) { } @Around(value = "pointcut(transactionWithRemoteParent)", argNames = "joinPoint,transactionWithRemoteParent") public Object around(ProceedingJoinPoint joinPoint, TransactionWithRemoteParent transactionWithRemoteParent) throws Throwable { Transaction transaction = null; try { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); transaction = ElasticApm.startTransactionWithRemoteParent(key -> { String httpRequest = (String) joinPoint.getArgs()[0]; JSONObject json = JSON.parseObject(httpRequest); String traceId = json.getString(key); LOGGER.info("切面添加了子Transaction,key={},value={}", key, traceId); RpcContext.getContext().setAttachment(key, traceId); return traceId; }); transaction.setName(StringUtils.isNotBlank(transactionWithRemoteParent.name()) ? transactionWithRemoteParent.name() : signature.getName()); transaction.setType(Transaction.TYPE_REQUEST); return joinPoint.proceed(); } catch (Throwable throwable) { if (transaction != null) { transaction.captureException(throwable); } throw throwable; } finally { if (transaction != null) { LOGGER.info("切面执行完毕,上报Transaction:{}", transaction.getId()); transaction.end(); } } } } 复制代码
如下代码是DubboConsumer过滤器,专门用于处理APM。DubboProvider的实现类似。
@Activate(group = "consumer") public class DubboConsumerApmFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(DubboConsumerApmFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> { String traceId = invocation.getAttachment(key); LOGGER.info("key={},value={}", key, traceId); return traceId; }); try (final Scope scope = transaction.activate()) { String name = "consumer:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName(); transaction.setName(name); transaction.setType(Transaction.TYPE_REQUEST); Result result = invoker.invoke(invocation); return result; } catch (Exception e) { transaction.captureException(e); throw e; } finally { transaction.end(); } } } @Activate(group = "provider") public class DubboProviderApmFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // use startTransactionWithRemoteParent to create transaction with parent, which id from prc context Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> invocation.getAttachment(key)); try (final Scope scope = transaction.activate()) { String name = "provider:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName(); transaction.setName(name); transaction.setType(Transaction.TYPE_REQUEST); return invoker.invoke(invocation); } catch (Exception e) { transaction.captureException(e); throw e; } finally { transaction.end(); } } } 复制代码