控制数据流向
在流进行转换操作后,Flink通过分区器来精确得控制数据流向。
- Flink分区 器如下图
1.随机分区器 ShufflePartitioner
根据均匀分布对元素进行随机划分
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private Random random = new Random();
private final int[] returnArray = new int[1];
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}
@Override
public StreamPartitioner<T> copy() {
return new ShufflePartitioner<T>();
}
@Override
public String toString() {
return "SHUFFLE";
}
}
- 自定义分区器CustomPartitionerWrappery
使用用户定义的分区程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
- 全局分区器GlobalPartitioner
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private final int[] returnArray = new int[] { 0 };
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
return returnArray;
}
@Override
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "GLOBAL";
}
}
4.重行分区 RebalancePartitioner
分区元素轮循,从而为每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有用。
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private final int[] returnArray = {Integer.MAX_VALUE - 1};
@Override
public int[] selectChannels(
SerializationDelegate<StreamRecord<T>> record,
int numChannels) {
int newChannel = ++returnArray[0];
if (newChannel >= numChannels) {
returnArray[0] = resetValue(numChannels, newChannel);
}
return returnArray;
}
private static int resetValue(
int numChannels,
int newChannel) {
if (newChannel == Integer.MAX_VALUE) {
// Initializes the first partition, this branch is only entered when initializing.
return ThreadLocalRandom.current().nextInt(numChannels);
}
return 0;
}
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "REBALANCE";
}
}
版权声明:本文为huonan_123原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。