建议阅读本篇前,请先阅读前两篇
SOFATracer
中, AsyncCommonDigestAppenderManager
对 disruptor
进行了封装,用于处理外部组件的 Tracer
摘要日志。该部分借助 AsyncCommonDigestAppenderManager
的源码来分析下 SOFATracer
如何使用 disruptor
的。
SOFATracer
中使用了两种不同的事件模型,一种是 SOFATracer
内部使用的 StringEvent
, 一种是 外部扩展使用的 SofaTacerSpanEvent
。这里以 SofaTacerSpanEvent
这种事件模型来分析。 StringEvent
消息事件模型对应的是 AsyncCommonAppenderManager
类封装的 disruptor
。
定义消息事件模型, SofaTacerSpanEvent
和 前面 demo
中的 LongEvent
基本结构是一样的,主要是内部持有的消息数据不同, LongEvent
中是一个 long
类型的数据, SofaTacerSpanEvent
中持有的是 SofaTracerSpan
。
public class SofaTracerSpanEvent { private volatile SofaTracerSpan sofaTracerSpan; public SofaTracerSpan getSofaTracerSpan() { return sofaTracerSpan; } public void setSofaTracerSpan(SofaTracerSpan sofaTracerSpan) { this.sofaTracerSpan = sofaTracerSpan; } } 复制代码
Consumer
是 AsyncCommonDigestAppenderManager
的内部类;实现了 EventHandler
接口,这个 consumer
就是作为消费者存在的。
在 AsyncCommonAppenderManager
中也有一个,这个地方个人觉得可以抽出去,这样可以使得 AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager
的代码看起来更干净;
private class Consumer implements EventHandler<SofaTracerSpanEvent> { //日志类型集合,非该集合内的日志类型将不会被处理 protected Set<String> logTypes = Collections.synchronizedSet(new HashSet<String>()); @Override public void onEvent(SofaTracerSpanEvent event, long sequence, boolean endOfBatch) throws Exception { // 拿到具体的消息数据 sofaTracerSpan SofaTracerSpan sofaTracerSpan = event.getSofaTracerSpan(); // 如果没有数据,则不做任何处理 if (sofaTracerSpan != null) { try { String logType = sofaTracerSpan.getLogType(); // 验证当前日志类型是否可以被当前consumer消费 if (logTypes.contains(logType)) { // 获取编码类型 SpanEncoder encoder = contextEncoders.get(logType); //获取 appender TraceAppender appender = appenders.get(logType); // 对数据进行编码处理 String encodedStr = encoder.encode(sofaTracerSpan); if (appender instanceof LoadTestAwareAppender) { ((LoadTestAwareAppender) appender).append(encodedStr, TracerUtils.isLoadTest(sofaTracerSpan)); } else { appender.append(encodedStr); } // 刷新缓冲区,日志输出 appender.flush(); } } catch (Exception e) { // 异常省略 } } } public void addLogType(String logType) { logTypes.add(logType); } } 复制代码
用于产生消息事件的 Factory
public class SofaTracerSpanEventFactory implements EventFactory<SofaTracerSpanEvent> { @Override public SofaTracerSpanEvent newInstance() { return new SofaTracerSpanEvent(); } } 复制代码
用来产生消费线程的 Factory
。
public class ConsumerThreadFactory implements ThreadFactory { private String workName; public String getWorkName() { return workName; } public void setWorkName(String workName) { this.workName = workName; } @Override public Thread newThread(Runnable runnable) { Thread worker = new Thread(runnable, "Tracer-AsyncConsumer-Thread-" + workName); worker.setDaemon(true); return worker; } } 复制代码
disruptor
的构建是在 AsyncCommonDigestAppenderManager
的构造函数中完成的。
public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) { // 使用这个计算来保证realQueueSize是2的次幂(返回当前 大于等于queueSize的最小的2的次幂数 ) int realQueueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); //构建disruptor,使用的是 ProducerType.MULTI //等待策略是 BlockingWaitStrategy disruptor = new Disruptor<SofaTracerSpanEvent>(new SofaTracerSpanEventFactory(), realQueueSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy()); //消费者列表 this.consumers = new ArrayList<Consumer>(consumerNumber); for (int i = 0; i < consumerNumber; i++) { Consumer consumer = new Consumer(); consumers.add(consumer); //设置异常处理程序 disruptor.setDefaultExceptionHandler(new ConsumerExceptionHandler()); //绑定消费者 disruptor.handleEventsWith(consumer); } //是否允许丢弃,从配置文件获取 this.allowDiscard = Boolean.parseBoolean(SofaTracerConfiguration.getProperty( SofaTracerConfiguration.TRACER_ASYNC_APPENDER_ALLOW_DISCARD, DEFAULT_ALLOW_DISCARD)); if (allowDiscard) { //是否记录丢失日志的数量 this.isOutDiscardNumber = Boolean.parseBoolean(SofaTracerConfiguration.getProperty( SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_NUMBER, DEFAULT_IS_OUT_DISCARD_NUMBER)); //是否记录丢失日志的TraceId和RpcId this.isOutDiscardId = Boolean.parseBoolean(SofaTracerConfiguration.getProperty( SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_ID, DEFAULT_IS_OUT_DISCARD_ID)); //丢失日志的数量达到该阈值进行一次日志输出 this.discardOutThreshold = Long.parseLong(SofaTracerConfiguration.getProperty( SofaTracerConfiguration.TRACER_ASYNC_APPENDER_DISCARD_OUT_THRESHOLD, DEFAULT_DISCARD_OUT_THRESHOLD)); if (isOutDiscardNumber) { this.discardCount = new PaddedAtomicLong(0L); } } } 复制代码
disruptor
的启动委托给了 AsyncCommonDigestAppenderManager
的 start
方法来执行。
public void start(final String workerName) { this.threadFactory.setWorkName(workerName); this.ringBuffer = this.disruptor.start(); } 复制代码
来看下, SOFATracer
中 具体是在哪里调用这个 start
的:
CommonTracerManager
: 这个里面持有了 AsyncCommonDigestAppenderManager
类的一个单例对象,并且是 static
静态代码块中调用了 start
方法;这个用来输出普通日志。 SofaTracerDigestReporterAsyncManager
:这里类里面也是持有了 AsyncCommonDigestAppenderManager
类的一个单例对像,并且提供了 getSofaTracerDigestReporterAsyncManager
方法来获取该单例,在这个方法中调用了 start
方法;该对象用来输出摘要日志。 前面的 demo
中是通过一个 for
循环来发布事件的,在 SOFATracer
中 的事件发布无非就是当有 Tracer
日志需要输出时会触发发布,那么对应的就是日志的 append
操作,将日志 append
到环形缓冲区。
public boolean append(SofaTracerSpan sofaTracerSpan) { long sequence = 0L; //是否允许丢弃 if (allowDiscard) { try { //允许丢弃就使用tryNext尝试申请序列,申请不到抛出异常 sequence = ringBuffer.tryNext(); } catch (InsufficientCapacityException e) { //是否输出丢失日志的TraceId和RpcId if (isOutDiscardId) { SofaTracerSpanContext sofaTracerSpanContext = sofaTracerSpan .getSofaTracerSpanContext(); if (sofaTracerSpanContext != null) { SynchronizingSelfLog.warn("discarded tracer: traceId[" + sofaTracerSpanContext.getTraceId() + "];spanId[" + sofaTracerSpanContext.getSpanId() + "]"); } } //是否输出丢失日志的数量 if ((isOutDiscardNumber) && discardCount.incrementAndGet() == discardOutThreshold) { discardCount.set(0); if (isOutDiscardNumber) { SynchronizingSelfLog.warn("discarded " + discardOutThreshold + " logs"); } } return false; } } else { // 不允许丢弃则使用next方法 sequence = ringBuffer.next(); } try { SofaTracerSpanEvent event = ringBuffer.get(sequence); event.setSofaTracerSpan(sofaTracerSpan); } catch (Exception e) { SynchronizingSelfLog.error("fail to add event"); return false; } //发布 ringBuffer.publish(sequence); return true; } 复制代码
SOFATracer 事件发布的调用逻辑:
追溯调用的流程,可以知道当前 span
调用 finish
时或者 SOFATracer
中调用 reportSpan
时 就相当于发布了一个消息事件。
本文对 SOFATracer
中使用 Disruptor
来进行日志输出的代码进行了简单的分析,更多内部细节原理可以自行看下 SOFATracer
的代码。 SOFATracer
作为一种比较底层的中间件组件,在实际的业务开发中基本是无法感知的。但是作为技术来学习,还是有很多点可以挖一挖。
SOFATracer GitHub 传送门 。
如果有小伙伴对中间件感兴趣,欢迎加入我们团队,欢迎来撩;对 SOFA 技术体系有兴趣的可以关注我们 ALIPAY SOFA 社区 ;附团队镇楼图。