这篇文章主要是想介绍在deploy到Taskmanager
前,Slotsharing
机制是如何将不同Task
的Subtask
复用同一个Slot,更加充分地利用Flink集群资源。学习的Flink代码版本是1.9.2。
相关概念
SlotSharingGroup
: 将不同的的Task放在相同的Slot
的分组,默认是default,只有一个分组,也就是Flink会尽量将不同Task的Subtask放在到一个Slot里面。CoLocationGroup
: 不同Task的相同Subtask放在相同Slot
的约束。如 process0 和sink0,process1和sink1分别放在一起。可以通过Flink API 中的Transformation 方法设置
public abstract class Transformation<T> { public void setSlotSharingGroup(String slotSharingGroup) { this.slotSharingGroup = slotSharingGroup; } public void setCoLocationGroupKey(@Nullable String coLocationGroupKey) { this.coLocationGroupKey = coLocationGroupKey; } }
实现
1.相关类:
SlotSharingManager
: 负责SlotSharing
的关键类,同一个作业相同SlotSharingGroup
的Execution
申请资源会共用同一个SlotSharingManager
。SlotSharingManager
维护多棵TaskSlot
树,其中TaskSlot
并不是真实TaskManager中的Slot,只是用来描述多个Task复用Slot的抽象实现。后面会详细说一下。
//尚未占用到真实Slot的TaskSlot树的根节点
private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
//已占用到真实Slot的TaskSlot树的根节点
private final Map<TaskManagerLocation, Map<AllocationID, MultiTaskSlot>> resolvedRootSlots;
TaskSlot
: 节点实现类, 上文提到的树节点,共有两个实现类MultiTaskSlot
:一般是树的根节点,还有可能是中间节点(设置了CoLocationGroup
),是PhyscialSlot.PayLoad
接口的实现类,如果是根节点,会占用一个PhysicalSlot
,这棵叶子节点会共享根节点的PhysicalSlot
。//子节点 private final Map<AbstractID, TaskSlot> children; // 父节点 @Nullable private final MultiTaskSlot parent; // 被分配的Slot,如果被分配完成(future是完成状态),那么这个根节点就会被标记成resolved private final CompletableFuture<? extends SlotContext> slotContextFuture; // 被占用Slot的requestid @Nullable private final SlotRequestId allocatedSlotRequestId;
SingleTaskSlot
: 叶子节点,TaskSlot
树的的叶子节点会复用根节点所占用的PhysicalSlot
。Execution
在向SlotSharingManager请求Slot时,会在相应树下新建叶子节点,并且向Execution
返回LogicalSlot
。Execution便可以使用LogicalSlot部署相应的Task。
LogicalSlot
: 通过LogicalSlot.PayLoad
的实现类是Execution
,LogicalSlot是提供给Execution
部署Task
的逻辑Slot。根据上文介绍树形的结构我们就可以看出多个LogicalSlot
对应一个PhysicalSlot
。PhysicalSlot
: 对应TaskManager
一个真实的Slot。
2.复用Slot流程:
一个任务如果不同的算子设置相同的SlotSharingGroup,就会交给相同的SlotSharingManager处理,在同一SlotSharingManager中,每棵树都会申请一个Physical Slot(在根节点)。不同Task的每次Slot的申请也会尽量被挂在一棵树下复用这棵树的Slot。举例:目前有3个并行度为2的JobVetex节点,其中,JobVetext1和JobVertex2使用相同约束CoLocationGroup=x1。
- 接到Subtask=0的JobVetex3 Slot请求,这个时候SlotSharingManager中没有Slot,就需要新建
TaskSlot
树,申请Slot,并在根节点新建一个对应JobVertex3的叶子节点,节点的Id是JobVertex3的JobVertexId。
- 因为刚才那棵树的JobVetex3已经被占,所以Subtask1的JobVetex3重复操作1,再建一棵树
- 这个时候又来了一条有CoLocationConstraint=x1(GroupId=x1)约束的Subtask=0的JobVetex1节点,这个时候因为当前TaskSlot树没有GroupId=x1的节点,就不需要再申请新的Slot,直接在原来的树上挂GroupId=x1的MutiTaskSlot节点和JobVertex1叶子节点。(注:有CoLocationConstraint约束的Slot请求来了才会在树上新建MultiTaskSlot以方便步骤5的时候相同CoLocationConstraint的Slot请求可以快速定位到相应的Slot)
- 同理在第二棵树上没有groupid=x1的节点, Subtask=1的JobVetex1节点挂到第二棵树上
- 截止到步骤4,
SlotSharingGroup
的思路已经讲得差不多了。接下来说一下CoLocationGroup
,如果设置了CoLocationGroup
约束,在构建ExecutionGraph
时,相同CoLocationGroup
且Subtask
相同的ExecutionVertex
就会被设置相同的CoLocationConstraint
对象。这样方便在步骤3,4中挂载Subtask=0,1的JobVertex1节点时,其对应的CoLocationConstraint就会被打上标记。JobVetex2就可以直接通过CoLocationConstraint
找到JobVertex1的父节点MutiTaskSlot
并挂在下面。
3.相关代码:
CoLocationGroup:
//再构建ExecutionGraph时,为相同CoLocationGroup的ExecutionVertex根据Subtask构建CoLocationConstraint,即
//相同Subtask共享一份CoLocationGroup。
CoLocationGroup clg = jobVertex.getCoLocationGroup();
if (clg != null) {
this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
}
else {
this.locationConstraint = null;
}
//CoLocationConstraint为打标记
final SlotRequestId slotRequestId = new SlotRequestId();
final SlotSharingManager.MultiTaskSlot coLocationSlot =
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
slotRequestId,
coLocationConstraint.getGroupId());
// mark the requested Slot as co-located Slot for other co-located tasks
coLocationConstraint.setSlotRequestId(slotRequestId);
//判断是否被打过标记 ,如果被打过标记,返回父节点MutiTaskSlot
final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
if (coLocationSlotRequestId != null) {
// we have a slot assigned --> try to retrieve it
final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
if (taskSlot != null) {
Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getResourceProfile())) {
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
}
throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
} else {
// the slot may have been cancelled in the mean time
coLocationConstraint.setSlotRequestId(null);
}
}
SlotSharingGroup:
//找到新的Slot请求的父节点位置
private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
AbstractID groupId,
SlotSharingManager slotSharingManager,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@Nullable Time allocationTimeout) throws NoResourceAvailableException {
//从已占用到slot的树找到找到未解决
Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
slotSharingManager.listResolvedRootSlotInfo(groupId);
SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
return multiTaskSlotLocality;
}
final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
// 从Slotpool查找可直接使用的slot
Optional<SlotAndLocality> optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);
SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get();
final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot();
//创建新的树形结构去allocated刚从slotpool取出的slot
final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()),
allocatedSlotRequestId);
if (allowQueuedScheduling) {
//找到未占用完成的slotPool的节点
SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
if (multiTaskSlot == null) {
//如果都未找到,就去申请新的Slot
final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(
allocatedSlotRequestId,
slotProfile,
allocationTimeout);
multiTaskSlot = slotSharingManager.createRootSlot(//创建新的树形结构去allocated刚刚申请的slot
multiTaskSlotRequestId,
slotAllocationFuture,
allocatedSlotRequestId);
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
}
throw new NoResourceAvailableException("Could not allocate a shared slot for " + groupId + '.');
}
//刚刚返回的父节点下加入叶子节点
final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
slotRequestId,
slotProfile.getResourceProfile(),
scheduledUnit.getJobVertexId(),
multiTaskSlotLocality.getLocality());
//返回LogicalSlot给Execution部署Task
return leaf.getLogicalSlotFuture();
结论
Flink的SlotSharing机制大概思路就是维护一个树形结构,每棵树的根节点会占用一个Slot,叶子节点对应每个Execution的Slot请求。会复用Root节点的Slot。Flink尽量将每次Execution的Slot请求放到已经存在的树下提高资源利用率。
CoLocationGroup的实现是在创建ExecutionGraph时为相同Subtask的Execution设置同一个约束。当某个Execution的Slot请求找到父节点后,会给约束打个标记,使有相同的Subtask的Slot请求中找到同一个父节点。
目前能想到的一个问题:在不设置SlotSharingGroup时,平时用户写的多Source,多Sink的任务会集中到个别几个Slot造成个别TaskManager 资源利用偏高。因为Flink版本比较低,计划通过在请求Slot过程中给所有TaskSlot树的按叶子节点数排序,并根据Execution数及并行度计算出每个树最大的叶子节点数,去尝试改善这种情况。后续准备跟进社区新版本。