转载

SOFATracer (三) : SOFATracer 中 Disruptor 实践

建议阅读本篇前,请先阅读前两篇

SOFATracer 中, AsyncCommonDigestAppenderManagerdisruptor 进行了封装,用于处理外部组件的 Tracer 摘要日志。该部分借助 AsyncCommonDigestAppenderManager 的源码来分析下 SOFATracer 如何使用 disruptor 的。

SOFATracer 中使用了两种不同的事件模型,一种是 SOFATracer 内部使用的 StringEvent , 一种是 外部扩展使用的 SofaTacerSpanEvent 。这里以 SofaTacerSpanEvent 这种事件模型来分析。 StringEvent 消息事件模型对应的是 AsyncCommonAppenderManager 类封装的 disruptor

SofaTracerSpanEvent ( -> LongEvent)

定义消息事件模型, SofaTacerSpanEvent 和 前面 demo 中的 LongEvent 基本结构是一样的,主要是内部持有的消息数据不同, LongEvent 中是一个 long 类型的数据, SofaTacerSpanEvent 中持有的是 SofaTracerSpan

public class SofaTracerSpanEvent {
    private volatile SofaTracerSpan sofaTracerSpan;
    public SofaTracerSpan getSofaTracerSpan() {
        return sofaTracerSpan;
    }
    public void setSofaTracerSpan(SofaTracerSpan sofaTracerSpan) {
        this.sofaTracerSpan = sofaTracerSpan;
    }
}
复制代码

Consumer ( -> LongEventHandler)

ConsumerAsyncCommonDigestAppenderManager 的内部类;实现了 EventHandler 接口,这个 consumer 就是作为消费者存在的。

AsyncCommonAppenderManager 中也有一个,这个地方个人觉得可以抽出去,这样可以使得 AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager 的代码看起来更干净;

private class Consumer implements EventHandler<SofaTracerSpanEvent> {
       //日志类型集合,非该集合内的日志类型将不会被处理
        protected Set<String> logTypes = Collections.synchronizedSet(new HashSet<String>());
        @Override
        public void onEvent(SofaTracerSpanEvent event, long sequence, boolean endOfBatch)
                                throws Exception {
            // 拿到具体的消息数据 sofaTracerSpan
            SofaTracerSpan sofaTracerSpan = event.getSofaTracerSpan();
            // 如果没有数据,则不做任何处理
            if (sofaTracerSpan != null) {
                try {
                    String logType = sofaTracerSpan.getLogType();
                    // 验证当前日志类型是否可以被当前consumer消费
                    if (logTypes.contains(logType)) {
                        // 获取编码类型
                        SpanEncoder encoder = contextEncoders.get(logType);
                        //获取 appender
                        TraceAppender appender = appenders.get(logType);
                        // 对数据进行编码处理
                        String encodedStr = encoder.encode(sofaTracerSpan);
                        if (appender instanceof LoadTestAwareAppender) {
                            ((LoadTestAwareAppender) appender).append(encodedStr,
                                TracerUtils.isLoadTest(sofaTracerSpan));
                        } else {
                            appender.append(encodedStr);
                        }
                        // 刷新缓冲区,日志输出
                        appender.flush();
                    }
                } catch (Exception e) {
                   // 异常省略
                }
            }
        }

        public void addLogType(String logType) {
            logTypes.add(logType);
        }
    }
复制代码

SofaTracerSpanEventFactory (-> LongEventFactory)

用于产生消息事件的 Factory

public class SofaTracerSpanEventFactory implements EventFactory<SofaTracerSpanEvent> {
    @Override
    public SofaTracerSpanEvent newInstance() {
        return new SofaTracerSpanEvent();
    }
}
复制代码

ConsumerThreadFactory (-> LongEventThreadFactory )

用来产生消费线程的 Factory

public class ConsumerThreadFactory implements ThreadFactory {
    private String workName;
    public String getWorkName() {
        return workName;
    }
    public void setWorkName(String workName) {
        this.workName = workName;
    }
    @Override
    public Thread newThread(Runnable runnable) {
        Thread worker = new Thread(runnable, "Tracer-AsyncConsumer-Thread-" + workName);
        worker.setDaemon(true);
        return worker;
    }
}
复制代码

构建disruptor

disruptor 的构建是在 AsyncCommonDigestAppenderManager 的构造函数中完成的。

public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) {
    // 使用这个计算来保证realQueueSize是2的次幂(返回当前 大于等于queueSize的最小的2的次幂数 )
    int realQueueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
    //构建disruptor,使用的是 ProducerType.MULTI
    //等待策略是 BlockingWaitStrategy
    disruptor = new Disruptor<SofaTracerSpanEvent>(new SofaTracerSpanEventFactory(),
        realQueueSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());
    //消费者列表
    this.consumers = new ArrayList<Consumer>(consumerNumber);
    
    for (int i = 0; i < consumerNumber; i++) {
        Consumer consumer = new Consumer();
        consumers.add(consumer);
        //设置异常处理程序
        disruptor.setDefaultExceptionHandler(new ConsumerExceptionHandler());
        //绑定消费者
        disruptor.handleEventsWith(consumer);
    }

    //是否允许丢弃,从配置文件获取
    this.allowDiscard = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
        SofaTracerConfiguration.TRACER_ASYNC_APPENDER_ALLOW_DISCARD, DEFAULT_ALLOW_DISCARD));
    
    if (allowDiscard) {
        //是否记录丢失日志的数量
        this.isOutDiscardNumber = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_NUMBER,
            DEFAULT_IS_OUT_DISCARD_NUMBER));
        //是否记录丢失日志的TraceId和RpcId
        this.isOutDiscardId = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_ID,
            DEFAULT_IS_OUT_DISCARD_ID));
        //丢失日志的数量达到该阈值进行一次日志输出
        this.discardOutThreshold = Long.parseLong(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_DISCARD_OUT_THRESHOLD,
            DEFAULT_DISCARD_OUT_THRESHOLD));
        if (isOutDiscardNumber) {
            this.discardCount = new PaddedAtomicLong(0L);
        }
    }
}
复制代码

启动 disruptor

disruptor 的启动委托给了 AsyncCommonDigestAppenderManagerstart 方法来执行。

public void start(final String workerName) {
    this.threadFactory.setWorkName(workerName);
    this.ringBuffer = this.disruptor.start();
}
复制代码

来看下, SOFATracer 中 具体是在哪里调用这个 start 的:

SOFATracer (三) : SOFATracer 中 Disruptor 实践
  • CommonTracerManager : 这个里面持有了 AsyncCommonDigestAppenderManager 类的一个单例对象,并且是 static 静态代码块中调用了 start 方法;这个用来输出普通日志。
  • SofaTracerDigestReporterAsyncManager :这里类里面也是持有了 AsyncCommonDigestAppenderManager 类的一个单例对像,并且提供了 getSofaTracerDigestReporterAsyncManager 方法来获取该单例,在这个方法中调用了 start 方法;该对象用来输出摘要日志。

发布事件

前面的 demo 中是通过一个 for 循环来发布事件的,在 SOFATracer 中 的事件发布无非就是当有 Tracer 日志需要输出时会触发发布,那么对应的就是日志的 append 操作,将日志 append 到环形缓冲区。

public boolean append(SofaTracerSpan sofaTracerSpan) {
    long sequence = 0L;
    //是否允许丢弃
    if (allowDiscard) {
        try {
            //允许丢弃就使用tryNext尝试申请序列,申请不到抛出异常
            sequence = ringBuffer.tryNext();
        } catch (InsufficientCapacityException e) {
            //是否输出丢失日志的TraceId和RpcId
            if (isOutDiscardId) {
                SofaTracerSpanContext sofaTracerSpanContext = sofaTracerSpan
                    .getSofaTracerSpanContext();
                if (sofaTracerSpanContext != null) {
                    SynchronizingSelfLog.warn("discarded tracer: traceId["
                                              + sofaTracerSpanContext.getTraceId()
                                              + "];spanId[" + sofaTracerSpanContext.getSpanId()
                                              + "]");
                }
            }
             //是否输出丢失日志的数量
            if ((isOutDiscardNumber) && discardCount.incrementAndGet() == discardOutThreshold) {
                discardCount.set(0);
                if (isOutDiscardNumber) {
                    SynchronizingSelfLog.warn("discarded " + discardOutThreshold + " logs");
                }
            }

            return false;
        }
    } else {
        // 不允许丢弃则使用next方法
        sequence = ringBuffer.next();
    }

    try {
        SofaTracerSpanEvent event = ringBuffer.get(sequence);
        event.setSofaTracerSpan(sofaTracerSpan);
    } catch (Exception e) {
        SynchronizingSelfLog.error("fail to add event");
        return false;
    }
    //发布
    ringBuffer.publish(sequence);
    return true;
}
复制代码

SOFATracer 事件发布的调用逻辑:

SOFATracer (三) : SOFATracer 中 Disruptor 实践

追溯调用的流程,可以知道当前 span 调用 finish 时或者 SOFATracer 中调用 reportSpan 时 就相当于发布了一个消息事件。

小结

本文对 SOFATracer 中使用 Disruptor 来进行日志输出的代码进行了简单的分析,更多内部细节原理可以自行看下 SOFATracer 的代码。 SOFATracer 作为一种比较底层的中间件组件,在实际的业务开发中基本是无法感知的。但是作为技术来学习,还是有很多点可以挖一挖。

SOFATracer GitHub 传送门 。

如果有小伙伴对中间件感兴趣,欢迎加入我们团队,欢迎来撩;对 SOFA 技术体系有兴趣的可以关注我们 ALIPAY SOFA 社区 ;附团队镇楼图。

SOFATracer (三) : SOFATracer 中 Disruptor 实践
原文  https://juejin.im/post/5b65663af265da0f51407e4b
正文到此结束
Loading...