Flink调优-分区器策略

控制数据流向

在流进行转换操作后,Flink通过分区器来精确得控制数据流向。

  • Flink分区 器如下图

在这里插入图片描述
1.随机分区器 ShufflePartitioner
根据均匀分布对元素进行随机划分

@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;

	private Random random = new Random();

	private final int[] returnArray = new int[1];

	@Override
	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
			int numberOfOutputChannels) {
		returnArray[0] = random.nextInt(numberOfOutputChannels);
		return returnArray;
	}

	@Override
	public StreamPartitioner<T> copy() {
		return new ShufflePartitioner<T>();
	}

	@Override
	public String toString() {
		return "SHUFFLE";
	}
}
  1. 自定义分区器CustomPartitionerWrappery
    使用用户定义的分区程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
  1. 全局分区器GlobalPartitioner
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;

	private final int[] returnArray = new int[] { 0 };

	@Override
	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
			int numberOfOutputChannels) {
		return returnArray;
	}

	@Override
	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "GLOBAL";
	}
}

4.重行分区 RebalancePartitioner
分区元素轮循,从而为每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有用。

@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;

	private final int[] returnArray = {Integer.MAX_VALUE - 1};

	@Override
	public int[] selectChannels(
			SerializationDelegate<StreamRecord<T>> record,
			int numChannels) {
		int newChannel = ++returnArray[0];
		if (newChannel >= numChannels) {
			returnArray[0] = resetValue(numChannels, newChannel);
		}
		return returnArray;
	}

	private static int resetValue(
			int numChannels,
			int newChannel) {
		if (newChannel == Integer.MAX_VALUE) {
			// Initializes the first partition, this branch is only entered when initializing.
			return ThreadLocalRandom.current().nextInt(numChannels);
		}
		return 0;
	}

	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "REBALANCE";
	}
}

官网参考


版权声明:本文为huonan_123原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。