IncrementalIndex类中有两个重要的成员,分别是 metricDescs
和 dimensionDescs
:
private final Map<String, MetricDesc> metricDescs; private final Map<String, DimensionDesc> dimensionDescs;
metricDescs和dimensionDescs在IncrementalIndex的构造函数中被初始化。
每个MetricDesc中有几个重要的成员:
private final int index; // metric序号 private final String name; // metric名字 private final String type: // metric类型 private final ColumnCapabilitiesImpl capabilities // metric能力
MetricDesc的构造函数:
public MetricDesc(int index, AggregatorFactory factory) { this.index = index; this.name = factory.getName(); String typeInfo = factory.getTypeName(); this.capabilities = new ColumnCapabilitiesImpl(); if ("float".equalsIgnoreCase(typeInfo)) { capabilities.setType(ValueType.FLOAT); this.type = typeInfo; } else if ("long".equalsIgnoreCase(typeInfo)) { capabilities.setType(ValueType.LONG); this.type = typeInfo; } else if ("double".equalsIgnoreCase(typeInfo)) { capabilities.setType(ValueType.DOUBLE); this.type = typeInfo; } else { capabilities.setType(ValueType.COMPLEX); this.type = ComplexMetrics.getSerdeForType(typeInfo).getTypeName(); } }
每个AggregatorFactory的实例都有一个名字,通过getTypeName()方法获取。比如CountAggregatorFactory的getTypeName()方法返回"long",HyperUniquesAggregatorFactory的getTypeName()方法返回"hyperUnique"。
如果对AggregatorFactory调用getTypeName()返回的名字不是"float"、"long"、"double"之一,name这个AggregatorFactory的类型是复杂类型,比如HyperUniquesAggregatorFactory。
在IncrementalIndex中通过如下代码构造每个metric的MetricDesc和MetricDescs:
for (AggregatorFactory metric : metrics) { MetricDesc metricDesc = new MetricDesc(metricDesc.size(), metric); metricDescs.put(metricDesc.getName(), metricDesc); }
每个DimensionDesc中有几个重要成员:
private final int index; // dimension序号 private final String name; // dimnesion名字 private final ColumnCapabilitiesImpl capabilities // dimension能力 private final DimensionHandler handler; private final DimensionIndexer indexer;
DimensionHandler对象封装了特定于某一个dimension的索引,列合并/创建,以及查询操作。这些操作由通过DimensionHandler方法创建的对象(DimensionIndexer通过makeIndexer创建,DimensionMerger通过makeMerger创建,DimensionColumnReader)handle。每个DimensionHandler对象都特定于一个单独的dimension。
每个dimension对应一个DimensionIndexer,用于在内存中处理注入的行。
在IncrementalIndex的构造函数中定义了每个dimension的capalibities:
ColumnCapabilitiesImpl capabilities = makeCapabilitiesFromValueType(type);
private ColumnCapabilitiesImpl makeCapabilitiesFromValueType(ValueType type) { ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); capabilities.setDictionaryEncoded(type == ValueType.STRING); capabilities.setHasBitmapIndexes(type == ValueType.STRING); capabilities.setType(type); return capabilities }
可见只有string类型的dimension才支持 字典编码
和 位图索引
。
设置是否支持位图索引:
capabilities.setHasBitmapIndexes(dimSchema.hasBitmapIndex());
只有string类型的dimension才支持字典编码。
根据不同的capabilities生成不同的DimensionHandler:
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities( dimName, capabilities, dimSchema.getMultiValueHanding() );
public static DimensionHandler getHandlerFromCapabilities( String dimensionName, ColumnCapabilities capabilities, MultiValueHandling multiValueHandling ) { if (capabilities == null) { return new StringDimensionHandler(dimensionName, multiValueHandling, true); } multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHanding; if (capabilities.getType() == ValueType.STRING) { if (!capabilities.isDictionaryEncoded()) { throw new IAE("String column must have dictionary encoding."); } } if (capabilities.getType() == ValueType.LONG) { return new LongDimensionHandler(dimensionName); } if (capabilities.getType() == ValueType.FLOAT) { return new FloatDimensionHandler(dimensionName); } if (capabilities.getType() == ValueType.DOUBLE) { return new DoubleDimensionHandler(dimensionName); } // Return a StringDimensionHandler by default (null columns will be treated as String typed) return new StringDimensionHandler(dimensionName, multiValueHandling, true); }
解析出的一行数据(这里认为一行数据的实际类型为 MapBasedInputRow
)最终会调用IncrementalIndex的toIncrementalIndexRow(InputRow row)方法向IncrementalIndex中加入一条数据。
对于一行数据中的某一列的值,会调用:
Object dimsKey = null; dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent( row.getRow(dimension), true );
这里row.getRow(dimension)就是解析出特定的一行数据中,dimension名字对应的的值。
indexer是在DimensionHandler中类型为DimensionIndexer的成员。这里我们只考虑String类型的Dimension,因此这里indexer的实例类型是StringDimensionIndexer。
下面我们来看一下StringDimensionIndexer的processRowValsToUnsortedEncodedKeyComponent方法。
在StringDimensionIndexer中有一个重要的内部类 DimensionDictionary
。其中有两个重要的成员:
private final Object2IntMap<String> valueToId = new Object2IntOpenHashMap<>(); private final List<String> idToValue = new ArrayList<>();
valueToId
存储了值到id的对应关系。 idToValue
存储了id到值的对应关系,id就是List的下标。
在processRowValsToUnsortedEncodedKeyComponent方法中:
如果传入的dimension的值是null的话,则会调用DimensionDictionary的getId方法:
final int nullId = dimLookup.getId(null);
这里如果是第一次遇到null值,则返回-1。
然后返回dimension的值编码后的值(这里是index、序号,比如第1行和第50行的数据可能返回的都是7):
encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId};
如果nullId是-1(首次遇到特定dimension值为null的情况),这时调用DimensionDictionary的add方法将null值加入 idToValue
这个List中,设置idForNull为idToValue.size()并返回这个的id(idForNull是null值在idToValue中的下标或索引);如果nullId不为-1,则说明不是首次遇到特定dimension值为null的情况,这时直接返回nullId(也是idForNull的值)。
传入的dimension的值是个List,这种情况我们先不做分析,只考虑单值的情况。
传入的dimension的值为单值,则调用DimensionDictionary的add方法:
encodedDimensionValues = new int[]{dimLoojup.add(emptyToNullIfNeeded(dimValues))};
在add方法中,首先看 valueToId
中有没有这个值,如果有的话,直接返回这个值对应的id,如果没有,则调用idToValue.size()设置这个值在idToValue中的索引,然后将这个值和对应的索引写入valueToId,并把这个值加入到idToValue中:
final int index = idToValue.size(); valueToId.put(originalValue, index); idToValue.add(originalValue);
然后设置特定dimension当前的minValue和maxValue,最后返回index:
minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue; maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue; return index;
这里给个例子,如果当前有10行数据,它们的维度dim列的值为'a','b','c','b','d','e','a','a','b','f',那么在这10列数据都调用processRowValsToUnsortedEncodedKeyComponent之后,idToValue中的值为[a, b, c, d, e, f],valueToId中的值为{'a'->0, 'b'->1, 'c'->2, 'd'->3, 'e'->4, 'f'->5},processRowValsToUnsortedEncodedKeyComponent返回值分别为{0},{1},{2},{1},{3},{4},{0},{0},{1},{5}。
回到toIncrementalIndexRow方法,对这一行数据的每个dimension都调用processRowValsToUnsortedEncodedKeyComponent返回一个index数组(单值的话数组中只有一个元素),单后设置dims的值:
Object[] dims;
dims[desc.getIndex()] = dimsKey;
然后构造一个incrementalIndexRow实例:
IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( Math.max(truncated, minTimestamp), dims, dimensionDescList, dimsKeySize );
其中truncated是根据注入spec中的granularitySpec中指定的queryGranularity的值截断的时间戳。例如一行数据中的time字段的值为2019-08-14T17:55:34,如果queryGranularity是NONE,则不截断,如果为minute,则截断为2019-08-14T17:55:00,如果为day,则截断为2019-08-14T00:00:00。
minTimestamp是当前segment起始的timestamp。
最后返回一个IncrementIndexRowResult实例:
return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages);
每个IncrementalIndex都有一个FactsHolder类型的成员,这里我们假设在注入的spec中的granularitySpec中指定了rollup为true(默认就为true),则这里的FactsHolder实际类型为RollupFactsHolder。
上面生成了IncrementalIndexRowResult实例之后,调用addToFacts:
final AddToFactsResult addToFactsResult = addToFacts( row, incrementalIndexResult.getIncrementalIndexRow(), in, rowSupplier, skipMaxRowsInMemoryCheck );
在OnheapIncrementalIndex中,有
protected AddToFacts( InputRow row, IncrementalIndexRow key, ThreadLocal<InputRow> rowContainer, Supplier<InputRow> rowSupplier, boolean skipMaxRowsInMemoryCheck )
在addToFacts中,首先对metrics(类型是AggregatorFactory数组)产出Aggregator数组:
Aggregator[] aggs; aggs = new Aggregator[metrics.length]; factorizeAggs(metrics, aggs, rowContainer, row);
对每个AggregatorFactory调用factorize产出Aggregator。比如对 CountAggregatorFactory
产出 CountAggregator
,对 HyperUniquesAggregatorFactory
产出 HyperUniquesAggregator
。
对产出的每个Aggregator调用aggregate方法计算当前的metric值。对于CountAggregator,它的aggregate方法定义如下:
public void aggregate() { ++count; }
很简单也就对它的count成员加1;
对于HyperUniquesAggregator,它的aggregate方法如下:
public void aggregate() { Object object = selector.getObject(); if (object == null) { return; } if (collector == null) { collector = HyperLogLogCollector.makeLatestCollector()l } collector.fold((HyperLogLogCollector) object); }
这里首先调用selector的getObject()方法,selector的类型实际上是IncrementalIndex中的makeColumnSelectorFactory方法返回的IncrementalIndexInputRowColumnSelectorFactory实例中makeColumnValueSelector方法返回的ColumnValueSelector实例。它的getObject()方法调用的是ComplexMetricExtractor的extractValue方法:
public Object getObject() { return extract.extractValue(in.get(), column, agg); }
在HyperUniquesSerde的getExtractor返回的ComplexMetricExtractor实例的extractValue方法中,实际返回的事一个HyperLogLogCollector实例:
public HyperLogLogCollector extractValue(InputRow inputRow, String metricName) { Object rawValue = inputRow.getRow(metricName); if (rawValue instanceOf HyperLogLogCollector) { return (HyperLogLogCollector) rawValue; } else { HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); List<String> dimValues = inputRow.getDimension(metricName); if (dimValues == null) { return collector; } for (String dimensionValue : dimValues) { collector.add(hyperLogLogHash.hash(dimensionValue)); } return collector; } }