kubernetes源码分析-ReplicaSet简要分析

在deployment分析中我们可以看到deployment创建,更新等操作最终调用的是ReplicaSets

func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {

	sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale

	annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))

	scaled := false
	var err error
	if sizeNeedsUpdate || annotationsNeedUpdate {
		rsCopy := rs.DeepCopy()
		*(rsCopy.Spec.Replicas) = newScale
		deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
		rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
		if err == nil && sizeNeedsUpdate {
			scaled = true
			dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
		}
	}
	return scaled, rs, err
}
  1. replicaset的启动定义:

k8s.io/kubernetes/cmd/kube-controller-manager/controllermanager.go

controllers["replicaset"] = startReplicaSetController
  1. startReplicaSetController定义

    k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go

    func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
    		return nil, false, nil
    	}
    	go replicaset.NewReplicaSetController(
    		ctx.InformerFactory.Apps().V1().ReplicaSets(),
    		ctx.InformerFactory.Core().V1().Pods(),
    		ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
    		replicaset.BurstReplicas,
    	).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    	return nil, true, nil
    }
    
    1. NewReplicaSetController 定义:

      pkg/controller/replicaset/replica_set.go

      // NewReplicaSetController configures a replica set controller with the specified event recorder
      func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
      	eventBroadcaster := record.NewBroadcaster()
      	eventBroadcaster.StartStructuredLogging(0)
      	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
      	return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
      		apps.SchemeGroupVersion.WithKind("ReplicaSet"),
      		"replicaset_controller",
      		"replicaset",
      		controller.RealPodControl{
      			KubeClient: kubeClient,
      			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
      		},
      	)
      }
      

      初始化[配置event处理方式,以及主机设置titile replicaset-controller

    2. NewBaseController设置rs的handler处理函数

      // NewBaseController is the implementation of NewReplicaSetController with additional injected
      // parameters so that it can also serve as the implementation of NewReplicationController.
      func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
      	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
      	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
      		ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
      	}
      
      	rsc := &ReplicaSetController{
      		GroupVersionKind: gvk,
      		kubeClient:       kubeClient,
      		podControl:       podControl,
      		burstReplicas:    burstReplicas,
      		expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
      		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
      	}
      
      	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      		AddFunc:    rsc.addRS,
      		UpdateFunc: rsc.updateRS,
      		DeleteFunc: rsc.deleteRS,
      	})
      	rsc.rsLister = rsInformer.Lister()
      	rsc.rsListerSynced = rsInformer.Informer().HasSynced
      
      	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      		AddFunc: rsc.addPod,
      		// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
      		// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
      		// local storage, so it should be ok.
      		UpdateFunc: rsc.updatePod,
      		DeleteFunc: rsc.deletePod,
      	})
      	rsc.podLister = podInformer.Lister()
      	rsc.podListerSynced = podInformer.Informer().HasSynced
      
      	rsc.syncHandler = rsc.syncReplicaSet
      
      	return rsc
      }
      

      1.注册addPod, updatePod, deletePod等pod响应函数

      2.添加addRS, updateRS,deleteRS处理rs定义的响应函数

    3. addPod源码:

      // When a pod is created, enqueue the replica set that manages it and update its expectations.
      func (rsc *ReplicaSetController) addPod(obj interface{}) {
      	pod := obj.(*v1.Pod)
      
      	if pod.DeletionTimestamp != nil {
      		// on a restart of the controller manager, it's possible a new pod shows up in a state that
      		// is already pending deletion. Prevent the pod from being a creation observation.
      		rsc.deletePod(pod)
      		return
      	}
      
      	// If it has a ControllerRef, that's all that matters.
      	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
      		rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
      		if rs == nil {
      			return
      		}
      		rsKey, err := controller.KeyFunc(rs)
      		if err != nil {
      			return
      		}
      		klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
      		rsc.expectations.CreationObserved(rsKey)
      		rsc.queue.Add(rsKey)
      		return
      	}
      
      	// Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
      	// them to see if anyone wants to adopt it.
      	// DO NOT observe creation because no controller should be waiting for an
      	// orphan.
      	rss := rsc.getPodReplicaSets(pod)
      	if len(rss) == 0 {
      		return
      	}
      	klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
      	for _, rs := range rss {
      		rsc.enqueueRS(rs)
      	}
      }
      
      1. 判断状态是否已经删除

      2. ControllerRef 判断是否有依赖, 有则依次取出依赖, 加入queue中进行启动

      3. 获取rs pod, 若为0则返回, 依次加入队列中

        创建为异步创建, rs知识将pod加入任务队列中, 进过scheduler后进行创建

    4. addRS

      func (rsc *ReplicaSetController) addRS(obj interface{}) {
      	rs := obj.(*apps.ReplicaSet)
      	klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
      	rsc.enqueueRS(rs)
      }
      

      只是将rs加入队列

    5. 再看一下handler

      rsc.syncHandler = rsc.syncReplicaSet
      

      syncreplicaset定义:

      // syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
      // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
      // invoked concurrently with the same key.
      func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
      	startTime := time.Now()
      	defer func() {
      		klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
      	}()
      
      	namespace, name, err := cache.SplitMetaNamespaceKey(key)
      	if err != nil {
      		return err
      	}
      	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
      	if errors.IsNotFound(err) {
      		klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
      		rsc.expectations.DeleteExpectations(key)
      		return nil
      	}
      	if err != nil {
      		return err
      	}
      
      	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
      	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
      	if err != nil {
      		utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
      		return nil
      	}
      
      	// list all pods to include the pods that don't match the rs`s selector
      	// anymore but has the stale controller ref.
      	// TODO: Do the List and Filter in a single pass, or use an index.
      	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
      	if err != nil {
      		return err
      	}
      	// Ignore inactive pods.
      	filteredPods := controller.FilterActivePods(allPods)
      
      	// NOTE: filteredPods are pointing to objects from cache - if you need to
      	// modify them, you need to copy it first.
      	filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
      	if err != nil {
      		return err
      	}
      
      	var manageReplicasErr error
      	if rsNeedsSync && rs.DeletionTimestamp == nil {
      		manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
      	}
      	rs = rs.DeepCopy()
      	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
      
      	// Always updates status as pods come up or die.
      	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
      	if err != nil {
      		// Multiple things could lead to this update failing. Requeuing the replica set ensures
      		// Returning an error causes a requeue without forcing a hotloop
      		return err
      	}
      	// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
      	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
      		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
      		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
      		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
      	}
      	return manageReplicasErr
      }
      
      1. ReplicaSets(namespace).Get(name) 获取rs信息, 并检查data
      2. 判断是否需要sync
      3. 获取selector
      4. rsc.podLister.Pods(rs.Namespace).List 获取pod列表
      5. 过滤inactive pod
      6. calculateStatus 计算状态的改变, 更新状态updateReplicaSetStatus
      7. 判断是否需要加入rsc.queue.AddAfter延迟, 当manageReplicasErr异常或者MinReadySeconds最小等待时间不为0的时候
    6. calculateStatus状态

      func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
      	newStatus := rs.Status
      	// Count the number of pods that have labels matching the labels of the pod
      	// template of the replica set, the matching pods may have more
      	// labels than are in the template. Because the label of podTemplateSpec is
      	// a superset of the selector of the replica set, so the possible
      	// matching pods must be part of the filteredPods.
      	fullyLabeledReplicasCount := 0
      	readyReplicasCount := 0
      	availableReplicasCount := 0
      	templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
      	for _, pod := range filteredPods {
      		if templateLabel.Matches(labels.Set(pod.Labels)) {
      			fullyLabeledReplicasCount++
      		}
      		if podutil.IsPodReady(pod) {
      			readyReplicasCount++
      			if podutil.IsPodAvailable(pod, rs.Spec.MinReadySeconds, metav1.Now()) {
      				availableReplicasCount++
      			}
      		}
      	}
      
      	failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
      	if manageReplicasErr != nil && failureCond == nil {
      		var reason string
      		if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
      			reason = "FailedCreate"
      		} else if diff > 0 {
      			reason = "FailedDelete"
      		}
      		cond := NewReplicaSetCondition(apps.ReplicaSetReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error())
      		SetCondition(&newStatus, cond)
      	} else if manageReplicasErr == nil && failureCond != nil {
      		RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
      	}
      
      	newStatus.Replicas = int32(len(filteredPods))
      	newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
      	newStatus.ReadyReplicas = int32(readyReplicasCount)
      	newStatus.AvailableReplicas = int32(availableReplicasCount)
      	return newStatus
      }
      

      计算可用的replica readyReplicasCount等

      如果有failureCond 失败的,则Condition 标志刷新状态


版权声明:本文为comprel原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。