Flink源码学习笔记(5) SlotSharing实现

封面图片不要使用微信打开文章,可以使用手机/电脑浏览器

这篇文章主要是想介绍在deploy到Taskmanager前,Slotsharing机制是如何将不同TaskSubtask复用同一个Slot,更加充分地利用Flink集群资源。学习的Flink代码版本是1.9.2。

相关概念

  1. SlotSharingGroup: 将不同的的Task放在相同的Slot的分组,默认是default,只有一个分组,也就是Flink会尽量将不同Task的Subtask放在到一个Slot里面。

  2. CoLocationGroup: 不同Task的相同Subtask放在相同Slot的约束。如 process0 和sink0,process1和sink1分别放在一起。

    可以通过Flink API 中的Transformation 方法设置

    public abstract class Transformation<T> {
      public void setSlotSharingGroup(String slotSharingGroup) {
    		this.slotSharingGroup = slotSharingGroup;
    	}
      public void setCoLocationGroupKey(@Nullable String coLocationGroupKey) {
    		this.coLocationGroupKey = coLocationGroupKey;
    	}
    }
    

实现

1.相关类:

  • SlotSharingManager : 负责SlotSharing的关键类,同一个作业相同SlotSharingGroupExecution申请资源会共用同一个SlotSharingManagerSlotSharingManager 维护多棵TaskSlot树,其中TaskSlot并不是真实TaskManager中的Slot,只是用来描述多个Task复用Slot的抽象实现。后面会详细说一下。
  //尚未占用到真实Slot的TaskSlot树的根节点
	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
	//已占用到真实Slot的TaskSlot树的根节点
	private final Map<TaskManagerLocation, Map<AllocationID, MultiTaskSlot>> resolvedRootSlots;
  • TaskSlot: 节点实现类, 上文提到的树节点,共有两个实现类

    image-20220314175549868

    • MultiTaskSlot:一般是树的根节点,还有可能是中间节点(设置了CoLocationGroup),是PhyscialSlot.PayLoad接口的实现类,如果是根节点,会占用一个PhysicalSlot,这棵叶子节点会共享根节点的PhysicalSlot

          //子节点
          private final Map<AbstractID, TaskSlot> children;
      		// 父节点
      		@Nullable
      		private final MultiTaskSlot parent;
      		// 被分配的Slot,如果被分配完成(future是完成状态),那么这个根节点就会被标记成resolved
      		private final CompletableFuture<? extends SlotContext> slotContextFuture;
      		// 被占用Slot的requestid
      		@Nullable
      		private final SlotRequestId allocatedSlotRequestId;
      
    • SingleTaskSlot: 叶子节点,TaskSlot树的的叶子节点会复用根节点所占用的PhysicalSlotExecution在向SlotSharingManager请求Slot时,会在相应树下新建叶子节点,并且向Execution返回LogicalSlot。Execution便可以使用LogicalSlot部署相应的Task。

  • LogicalSlot: 通过LogicalSlot.PayLoad的实现类是Execution,LogicalSlot是提供给Execution部署Task的逻辑Slot。根据上文介绍树形的结构我们就可以看出多个LogicalSlot 对应一个PhysicalSlot

  • PhysicalSlot: 对应TaskManager一个真实的Slot。

2.复用Slot流程:

一个任务如果不同的算子设置相同的SlotSharingGroup,就会交给相同的SlotSharingManager处理,在同一SlotSharingManager中,每棵树都会申请一个Physical Slot(在根节点)。不同Task的每次Slot的申请也会尽量被挂在一棵树下复用这棵树的Slot。举例:目前有3个并行度为2的JobVetex节点,其中,JobVetext1和JobVertex2使用相同约束CoLocationGroup=x1。

  1. 接到Subtask=0的JobVetex3 Slot请求,这个时候SlotSharingManager中没有Slot,就需要新建TaskSlot树,申请Slot,并在根节点新建一个对应JobVertex3的叶子节点,节点的Id是JobVertex3的JobVertexId。

image-20220314180743237

  1. 因为刚才那棵树的JobVetex3已经被占,所以Subtask1的JobVetex3重复操作1,再建一棵树

image-20220314180907393

  1. 这个时候又来了一条有CoLocationConstraint=x1(GroupId=x1)约束的Subtask=0的JobVetex1节点,这个时候因为当前TaskSlot树没有GroupId=x1的节点,就不需要再申请新的Slot,直接在原来的树上挂GroupId=x1的MutiTaskSlot节点和JobVertex1叶子节点。(注:有CoLocationConstraint约束的Slot请求来了才会在树上新建MultiTaskSlot以方便步骤5的时候相同CoLocationConstraint的Slot请求可以快速定位到相应的Slot)

image-20220314183311510

  1. 同理在第二棵树上没有groupid=x1的节点, Subtask=1的JobVetex1节点挂到第二棵树上

image-20220314183420281

  1. 截止到步骤4,SlotSharingGroup的思路已经讲得差不多了。接下来说一下CoLocationGroup,如果设置了CoLocationGroup约束,在构建ExecutionGraph时,相同CoLocationGroupSubtask相同的ExecutionVertex就会被设置相同的CoLocationConstraint对象。这样方便在步骤3,4中挂载Subtask=0,1的JobVertex1节点时,其对应的CoLocationConstraint就会被打上标记。JobVetex2就可以直接通过CoLocationConstraint找到JobVertex1的父节点MutiTaskSlot并挂在下面。

image-20220314183713102

3.相关代码:

CoLocationGroup:

//再构建ExecutionGraph时,为相同CoLocationGroup的ExecutionVertex根据Subtask构建CoLocationConstraint,即
//相同Subtask共享一份CoLocationGroup。
CoLocationGroup clg = jobVertex.getCoLocationGroup();
		if (clg != null) {
			this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
		}
		else {
			this.locationConstraint = null;
		}
//CoLocationConstraint为打标记
final SlotRequestId slotRequestId = new SlotRequestId();
		final SlotSharingManager.MultiTaskSlot coLocationSlot =
			multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
				slotRequestId,
				coLocationConstraint.getGroupId());

		// mark the requested Slot as co-located Slot for other co-located tasks
		coLocationConstraint.setSlotRequestId(slotRequestId);
//判断是否被打过标记	,如果被打过标记,返回父节点MutiTaskSlot
final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();		
if (coLocationSlotRequestId != null) { 
			// we have a slot assigned --> try to retrieve it
			final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);

			if (taskSlot != null) {
				Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);

				SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;

				if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getResourceProfile())) {
					return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
				}

				throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
			} else {
				// the slot may have been cancelled in the mean time
				coLocationConstraint.setSlotRequestId(null);
			}
		}

SlotSharingGroup:

//找到新的Slot请求的父节点位置
private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
			AbstractID groupId,
			SlotSharingManager slotSharingManager,
			SlotProfile slotProfile,
			boolean allowQueuedScheduling,
			@Nullable Time allocationTimeout) throws NoResourceAvailableException {
		//从已占用到slot的树找到找到未解决
		Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
				slotSharingManager.listResolvedRootSlotInfo(groupId);

		SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
			slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
		
		if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
			return multiTaskSlotLocality;
		}

		final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
		final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
    // 从Slotpool查找可直接使用的slot
		Optional<SlotAndLocality> optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);
	  
			SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get();
				final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot();
         //创建新的树形结构去allocated刚从slotpool取出的slot
				final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
					multiTaskSlotRequestId,
					CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()),
					allocatedSlotRequestId);


		if (allowQueuedScheduling) {
			//找到未占用完成的slotPool的节点
			SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
			if (multiTaskSlot == null) {
				//如果都未找到,就去申请新的Slot
				final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(
					allocatedSlotRequestId,
					slotProfile,
					allocationTimeout);

				multiTaskSlot = slotSharingManager.createRootSlot(//创建新的树形结构去allocated刚刚申请的slot
					multiTaskSlotRequestId,
					slotAllocationFuture,
					allocatedSlotRequestId);


			return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
		}

		throw new NoResourceAvailableException("Could not allocate a shared slot for " + groupId + '.');
	}
//刚刚返回的父节点下加入叶子节点
final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
			slotRequestId,
			slotProfile.getResourceProfile(),
			scheduledUnit.getJobVertexId(),
			multiTaskSlotLocality.getLocality());
    //返回LogicalSlot给Execution部署Task
		return leaf.getLogicalSlotFuture();

结论

Flink的SlotSharing机制大概思路就是维护一个树形结构,每棵树的根节点会占用一个Slot,叶子节点对应每个Execution的Slot请求。会复用Root节点的Slot。Flink尽量将每次Execution的Slot请求放到已经存在的树下提高资源利用率。

CoLocationGroup的实现是在创建ExecutionGraph时为相同Subtask的Execution设置同一个约束。当某个Execution的Slot请求找到父节点后,会给约束打个标记,使有相同的Subtask的Slot请求中找到同一个父节点。

目前能想到的一个问题:在不设置SlotSharingGroup时,平时用户写的多Source,多Sink的任务会集中到个别几个Slot造成个别TaskManager 资源利用偏高。因为Flink版本比较低,计划通过在请求Slot过程中给所有TaskSlot树的按叶子节点数排序,并根据Execution数及并行度计算出每个树最大的叶子节点数,去尝试改善这种情况。后续准备跟进社区新版本。


版权声明:本文为m0_37053048原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。