转载

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向。

StreamPartitioner

StreamPartitioner 是Flink流分区器的基类,它只定义了一个抽象方法:

public abstract StreamPartitioner<T> copy();

但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于——各个分区器需要实现 channel 选择的接口方法:

int[] selectChannels(T record, int numChannels);

该方法针对当前的 record 以及所有的 channel 数目,返回一个针对当前这条记录采用的 output channel 的索引数组。(注意这里返回的是数组,说明一个记录可能会输出到多个 channel 这点我们后面会谈到)。

该接口方法来自于 StreamPartitioner 实现的接口 ChannelSelector

分区器整体类图:

Apache Flink流分区器剖析

GlobalPartitioner

全局分区器,其实现很简单——默认选择了索引为0的channel进行输出。

private int[] returnArray = new int[] { 0 };

@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,int numberOfOutputChannels) {
return returnArray;
}

ForwardPartitioner

该分区器将记录转发给在本地运行的下游的(归属于 subtask )的 operattion 。其实现跟上面的 GlobalPartitioner 一致,就不贴代码了。

ShufflePartitioner

混洗分区器,该分区器会在所有 output channel 中选择一个随机的进行输出。

private int[] returnArray = new int[1];

@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}

HashPartitioner

hash分区器,该分区器对key进行hash后计算得到 channel 索引。它通过构造器获得 KeySelector 的实例(该实例用来获取当前记录的key)。

获得key后,通过其 hashcodenumberOfOutputChannels 取模后计算得出最终输出的 channel 的索引。

public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
Object key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;

return returnArray;
}

BroadcastPartitioner

广播分区器,用于将该记录广播给下游的所有的 subtask 。这里采用了两个标记:

  • set
  • setNumber
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
if (set && setNumber == numberOfOutputChannels) {
return returnArray;
} else {
this.returnArray = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
returnArray[i] = i;
}
set = true;
setNumber = numberOfOutputChannels;
return returnArray;
}
}

从上面的实现可见,它返回了一个跟 numberOfOutputChannels 相等的数组(数组的大小就是即将输出到 channel 的个数)。

RebalancePartitioner

重平衡分区器,用于实现类似于 round-robin 这样的轮转模式的分区器。通过累加、取模的形式来实现对输出 channel 的切换。

public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
return this.returnArray;
}

RescalePartitioner

也是以 round-robin 的形式将元素分区到下游 subtask 的子集中。

上游操作所发送的元素被分区到下游操作的哪些子集,依赖于上游和下游操作的并行度。例如,如果上游操作的并行度为2,而下游操作的并行度为4,那么一个上游操作会分发元素给两个下游操作,同时另一个上游操作会分发给另两个下游操作。相反的,如果下游操作的并行度为2,而上游操作的并行度为4,那么两个上游操作会分发数据给一个下游操作,同时另两个上游操作会分发数据给另一个下游操作。

在上下游的并行度不是呈倍数关系的情况下,下游操作会有数量不同的来自上游操作的输入。具体的实现代码同 RebalancePartitioner

CustomPartitionerWrapper

自定义分区器包装器,该包装器封装了对于自定义的分区器的实现。自定义的分区测量依赖于 Partitioner 接口。它提供了自定义分区器的契约。核心接口方法是:

/**
* Computes the partition for the given key.
*
* @param key The key.
* @param numPartitions The number of partitions to partition into.
* @return The partition index.
*/
int partition(K key, int numPartitions);

该接口方法的描述很清晰,通过给定的 key 以及 numPartitions 返回 partition 的index.

CustomPartitionerWrapper 通过构造器注入 Partitioner 的实例,然后在 selectChannels 方法中通过 partition 接口来获得最终的 channel 索引。

public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {

K key = null;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}

returnArray[0] = partitioner.partition(key,
numberOfOutputChannels);

return returnArray;
}

小结

以上的这些分区器,最终会体现在 DataStream 的API中用来对数据流进行 物理 分区。

微信扫码关注公众号:Apache_Flink

Apache Flink流分区器剖析

QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

Apache Flink流分区器剖析

原文  http://vinoyang.com/2016/06/22/flink-data-stream-partitioner/
正文到此结束
Loading...