这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向。
StreamPartitioner
是Flink流分区器的基类,它只定义了一个抽象方法:
public abstract StreamPartitioner<T> copy();
但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于——各个分区器需要实现 channel
选择的接口方法:
int[] selectChannels(T record, int numChannels);
该方法针对当前的 record
以及所有的 channel
数目,返回一个针对当前这条记录采用的 output channel
的索引数组。(注意这里返回的是数组,说明一个记录可能会输出到多个 channel
这点我们后面会谈到)。
该接口方法来自于 StreamPartitioner
实现的接口 ChannelSelector
。
分区器整体类图:
全局分区器,其实现很简单——默认选择了索引为0的channel进行输出。
private int[] returnArray = new int[] { 0 };
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,int numberOfOutputChannels) {
return returnArray;
}
该分区器将记录转发给在本地运行的下游的(归属于 subtask
)的 operattion
。其实现跟上面的 GlobalPartitioner
一致,就不贴代码了。
混洗分区器,该分区器会在所有 output channel
中选择一个随机的进行输出。
private int[] returnArray = new int[1];
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}
hash分区器,该分区器对key进行hash后计算得到 channel
索引。它通过构造器获得 KeySelector
的实例(该实例用来获取当前记录的key)。
获得key后,通过其 hashcode
跟 numberOfOutputChannels
取模后计算得出最终输出的 channel
的索引。
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
Object key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;
return returnArray;
}
广播分区器,用于将该记录广播给下游的所有的 subtask
。这里采用了两个标记:
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
if (set && setNumber == numberOfOutputChannels) {
return returnArray;
} else {
this.returnArray = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
returnArray[i] = i;
}
set = true;
setNumber = numberOfOutputChannels;
return returnArray;
}
}
从上面的实现可见,它返回了一个跟 numberOfOutputChannels
相等的数组(数组的大小就是即将输出到 channel
的个数)。
重平衡分区器,用于实现类似于 round-robin
这样的轮转模式的分区器。通过累加、取模的形式来实现对输出 channel
的切换。
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
return this.returnArray;
}
也是以 round-robin
的形式将元素分区到下游 subtask
的子集中。
上游操作所发送的元素被分区到下游操作的哪些子集,依赖于上游和下游操作的并行度。例如,如果上游操作的并行度为2,而下游操作的并行度为4,那么一个上游操作会分发元素给两个下游操作,同时另一个上游操作会分发给另两个下游操作。相反的,如果下游操作的并行度为2,而上游操作的并行度为4,那么两个上游操作会分发数据给一个下游操作,同时另两个上游操作会分发数据给另一个下游操作。
在上下游的并行度不是呈倍数关系的情况下,下游操作会有数量不同的来自上游操作的输入。具体的实现代码同 RebalancePartitioner
。
自定义分区器包装器,该包装器封装了对于自定义的分区器的实现。自定义的分区测量依赖于 Partitioner
接口。它提供了自定义分区器的契约。核心接口方法是:
/**
* Computes the partition for the given key.
*
* @param key The key.
* @param numPartitions The number of partitions to partition into.
* @return The partition index.
*/
int partition(K key, int numPartitions);
该接口方法的描述很清晰,通过给定的 key
以及 numPartitions
返回 partition
的index.
CustomPartitionerWrapper
通过构造器注入 Partitioner
的实例,然后在 selectChannels
方法中通过 partition
接口来获得最终的 channel
索引。
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
K key = null;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
returnArray[0] = partitioner.partition(key,
numberOfOutputChannels);
return returnArray;
}
以上的这些分区器,最终会体现在 DataStream
的API中用来对数据流进行 物理 分区。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)