转载

Druid源码解析 - Segment文件格式

IncrementalIndex

IncrementalIndex类中有两个重要的成员,分别是 metricDescsdimensionDescs

private final Map<String, MetricDesc> metricDescs;
private final Map<String, DimensionDesc> dimensionDescs;

metricDescs和dimensionDescs在IncrementalIndex的构造函数中被初始化。

MetricDesc

每个MetricDesc中有几个重要的成员:

private final int index;                              // metric序号
private final String name;                            // metric名字
private final String type:                            // metric类型
private final ColumnCapabilitiesImpl capabilities     // metric能力

MetricDesc的构造函数:

public MetricDesc(int index, AggregatorFactory factory) 
{
  this.index = index;
  this.name = factory.getName();

  String typeInfo = factory.getTypeName();
  this.capabilities = new ColumnCapabilitiesImpl();
  if ("float".equalsIgnoreCase(typeInfo)) {
    capabilities.setType(ValueType.FLOAT);
    this.type = typeInfo;
  } else if ("long".equalsIgnoreCase(typeInfo)) {
    capabilities.setType(ValueType.LONG);
    this.type = typeInfo;
  } else if ("double".equalsIgnoreCase(typeInfo)) {
    capabilities.setType(ValueType.DOUBLE);
    this.type = typeInfo;
  } else {
    capabilities.setType(ValueType.COMPLEX);
    this.type = ComplexMetrics.getSerdeForType(typeInfo).getTypeName();
  }
}

每个AggregatorFactory的实例都有一个名字,通过getTypeName()方法获取。比如CountAggregatorFactory的getTypeName()方法返回"long",HyperUniquesAggregatorFactory的getTypeName()方法返回"hyperUnique"。

如果对AggregatorFactory调用getTypeName()返回的名字不是"float"、"long"、"double"之一,name这个AggregatorFactory的类型是复杂类型,比如HyperUniquesAggregatorFactory。

在IncrementalIndex中通过如下代码构造每个metric的MetricDesc和MetricDescs:

for (AggregatorFactory metric : metrics) {
  MetricDesc metricDesc = new MetricDesc(metricDesc.size(), metric);
  metricDescs.put(metricDesc.getName(), metricDesc);
}

DimensionDesc

每个DimensionDesc中有几个重要成员:

private final int index;      // dimension序号
private final String name;    // dimnesion名字
private final ColumnCapabilitiesImpl capabilities    // dimension能力
private final DimensionHandler handler;
private final DimensionIndexer indexer;

DimensionHandler

DimensionHandler对象封装了特定于某一个dimension的索引,列合并/创建,以及查询操作。这些操作由通过DimensionHandler方法创建的对象(DimensionIndexer通过makeIndexer创建,DimensionMerger通过makeMerger创建,DimensionColumnReader)handle。每个DimensionHandler对象都特定于一个单独的dimension。

DimensionIndexer

每个dimension对应一个DimensionIndexer,用于在内存中处理注入的行。

ColumnCapabilitiesImpl

在IncrementalIndex的构造函数中定义了每个dimension的capalibities:

ColumnCapabilitiesImpl capabilities = makeCapabilitiesFromValueType(type);
private ColumnCapabilitiesImpl makeCapabilitiesFromValueType(ValueType type) 
{
  ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
  capabilities.setDictionaryEncoded(type == ValueType.STRING);
  capabilities.setHasBitmapIndexes(type == ValueType.STRING);
  capabilities.setType(type);
  return capabilities
}

可见只有string类型的dimension才支持 字典编码位图索引

设置是否支持位图索引:

capabilities.setHasBitmapIndexes(dimSchema.hasBitmapIndex());

只有string类型的dimension才支持字典编码。

根据不同的capabilities生成不同的DimensionHandler:

DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(
  dimName,
  capabilities,
  dimSchema.getMultiValueHanding()
);
public static DimensionHandler getHandlerFromCapabilities(
    String dimensionName,
    ColumnCapabilities capabilities,
    MultiValueHandling multiValueHandling
)
{
  if (capabilities == null) {
    return new StringDimensionHandler(dimensionName, multiValueHandling, true);
  }

  multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHanding;

  if (capabilities.getType() == ValueType.STRING) {
    if (!capabilities.isDictionaryEncoded()) {
      throw new IAE("String column must have dictionary encoding.");
    }
  }

  if (capabilities.getType() == ValueType.LONG) {
    return new LongDimensionHandler(dimensionName);
  }

  if (capabilities.getType() == ValueType.FLOAT) {
    return new FloatDimensionHandler(dimensionName);
  }

  if (capabilities.getType() == ValueType.DOUBLE) {
    return new DoubleDimensionHandler(dimensionName);
  }

  // Return a StringDimensionHandler by default (null columns will be treated as String typed)
  return new StringDimensionHandler(dimensionName, multiValueHandling, true);
}

向IncrementalIndex中写入一行数据

解析出的一行数据(这里认为一行数据的实际类型为 MapBasedInputRow )最终会调用IncrementalIndex的toIncrementalIndexRow(InputRow row)方法向IncrementalIndex中加入一条数据。

对于一行数据中的某一列的值,会调用:

Object dimsKey = null;
dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(
    row.getRow(dimension),
    true
);

这里row.getRow(dimension)就是解析出特定的一行数据中,dimension名字对应的的值。

indexer是在DimensionHandler中类型为DimensionIndexer的成员。这里我们只考虑String类型的Dimension,因此这里indexer的实例类型是StringDimensionIndexer。

下面我们来看一下StringDimensionIndexer的processRowValsToUnsortedEncodedKeyComponent方法。

在StringDimensionIndexer中有一个重要的内部类 DimensionDictionary 。其中有两个重要的成员:

private final Object2IntMap<String> valueToId = new Object2IntOpenHashMap<>();
private final List<String> idToValue = new ArrayList<>();

valueToId 存储了值到id的对应关系。 idToValue 存储了id到值的对应关系,id就是List的下标。

processRowValsToUnsortedEncodedKeyComponent方法

在processRowValsToUnsortedEncodedKeyComponent方法中:

如果传入的dimension的值是null的话,则会调用DimensionDictionary的getId方法:

final int nullId = dimLookup.getId(null);

这里如果是第一次遇到null值,则返回-1。

然后返回dimension的值编码后的值(这里是index、序号,比如第1行和第50行的数据可能返回的都是7):

encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId};

如果nullId是-1(首次遇到特定dimension值为null的情况),这时调用DimensionDictionary的add方法将null值加入 idToValue 这个List中,设置idForNull为idToValue.size()并返回这个的id(idForNull是null值在idToValue中的下标或索引);如果nullId不为-1,则说明不是首次遇到特定dimension值为null的情况,这时直接返回nullId(也是idForNull的值)。

传入的dimension的值是个List,这种情况我们先不做分析,只考虑单值的情况。

传入的dimension的值为单值,则调用DimensionDictionary的add方法:

encodedDimensionValues = new int[]{dimLoojup.add(emptyToNullIfNeeded(dimValues))};

在add方法中,首先看 valueToId 中有没有这个值,如果有的话,直接返回这个值对应的id,如果没有,则调用idToValue.size()设置这个值在idToValue中的索引,然后将这个值和对应的索引写入valueToId,并把这个值加入到idToValue中:

final int index = idToValue.size();
valueToId.put(originalValue, index);
idToValue.add(originalValue);

然后设置特定dimension当前的minValue和maxValue,最后返回index:

minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue;
maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue;
return index;

processRowValsToUnsortedEncodedKeyComponent最终返回的是当前行的特定列的值在valueToId中的id,也就是在idToValue中的索引。

需要记住的是,每个dimension对应一个DimensionDesc,每个DimensionDesc中有一个DimensionIndexer,每个DimensionIndexer中 有一个DimensionDictionary,每个DimensionDictionary中有一个valueToId和一个IdToValue。

这里给个例子,如果当前有10行数据,它们的维度dim列的值为'a','b','c','b','d','e','a','a','b','f',那么在这10列数据都调用processRowValsToUnsortedEncodedKeyComponent之后,idToValue中的值为[a, b, c, d, e, f],valueToId中的值为{'a'->0, 'b'->1, 'c'->2, 'd'->3, 'e'->4, 'f'->5},processRowValsToUnsortedEncodedKeyComponent返回值分别为{0},{1},{2},{1},{3},{4},{0},{0},{1},{5}。

回到toIncrementalIndexRow方法,对这一行数据的每个dimension都调用processRowValsToUnsortedEncodedKeyComponent返回一个index数组(单值的话数组中只有一个元素),单后设置dims的值:

Object[] dims;
dims[desc.getIndex()] = dimsKey;

然后构造一个incrementalIndexRow实例:

IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize(
  Math.max(truncated, minTimestamp),
  dims,
  dimensionDescList,
  dimsKeySize
);

其中truncated是根据注入spec中的granularitySpec中指定的queryGranularity的值截断的时间戳。例如一行数据中的time字段的值为2019-08-14T17:55:34,如果queryGranularity是NONE,则不截断,如果为minute,则截断为2019-08-14T17:55:00,如果为day,则截断为2019-08-14T00:00:00。

minTimestamp是当前segment起始的timestamp。

最后返回一个IncrementIndexRowResult实例:

return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages);

FactsHolder

每个IncrementalIndex都有一个FactsHolder类型的成员,这里我们假设在注入的spec中的granularitySpec中指定了rollup为true(默认就为true),则这里的FactsHolder实际类型为RollupFactsHolder。

上面生成了IncrementalIndexRowResult实例之后,调用addToFacts:

final AddToFactsResult addToFactsResult = addToFacts(
  row,
  incrementalIndexResult.getIncrementalIndexRow(),
  in,
  rowSupplier,
  skipMaxRowsInMemoryCheck
);

在OnheapIncrementalIndex中,有

protected AddToFacts(
  InputRow row,
  IncrementalIndexRow key,
  ThreadLocal<InputRow> rowContainer,
  Supplier<InputRow> rowSupplier,
  boolean skipMaxRowsInMemoryCheck
)

从AggregatorFactory产出Aggregator

在addToFacts中,首先对metrics(类型是AggregatorFactory数组)产出Aggregator数组:

Aggregator[] aggs;

aggs = new Aggregator[metrics.length];

factorizeAggs(metrics, aggs, rowContainer, row);

对每个AggregatorFactory调用factorize产出Aggregator。比如对 CountAggregatorFactory 产出 CountAggregator ,对 HyperUniquesAggregatorFactory 产出 HyperUniquesAggregator

对产出的每个Aggregator调用aggregate方法计算当前的metric值。对于CountAggregator,它的aggregate方法定义如下:

public void aggregate() 
{
  ++count;
}

很简单也就对它的count成员加1;

对于HyperUniquesAggregator,它的aggregate方法如下:

public void aggregate() 
{
  Object object = selector.getObject();
  if (object == null) {
    return;
  }
  if (collector == null) {
    collector = HyperLogLogCollector.makeLatestCollector()l
  }
  collector.fold((HyperLogLogCollector) object);
}

这里首先调用selector的getObject()方法,selector的类型实际上是IncrementalIndex中的makeColumnSelectorFactory方法返回的IncrementalIndexInputRowColumnSelectorFactory实例中makeColumnValueSelector方法返回的ColumnValueSelector实例。它的getObject()方法调用的是ComplexMetricExtractor的extractValue方法:

public Object getObject() 
{
  return extract.extractValue(in.get(), column, agg);
}

在HyperUniquesSerde的getExtractor返回的ComplexMetricExtractor实例的extractValue方法中,实际返回的事一个HyperLogLogCollector实例:

public HyperLogLogCollector extractValue(InputRow inputRow, String metricName) 
{
  Object rawValue = inputRow.getRow(metricName);
  
  if (rawValue instanceOf HyperLogLogCollector) {
    return (HyperLogLogCollector) rawValue;
  } else {
    HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
    
    List<String> dimValues = inputRow.getDimension(metricName);
    if (dimValues == null) {
      return collector;
    }
    
    for (String dimensionValue : dimValues) {
      collector.add(hyperLogLogHash.hash(dimensionValue));
    }
    return collector;
  }
}
原文  https://segmentfault.com/a/1190000020073526
正文到此结束
Loading...