Istio pilot-discovery服务发现源码解析(1.13版本)

介绍

Istio Pilot的代码分为Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用于在数据面负责Envoy的生命周期管理,Pilot-Discovery才是控制面进行流量管理的组件,本文将重点分析控制面部分,即Pilot-Discovery的代码。

工作机制

Istio源码分支: release-1.13

初始化

func newDiscoveryCommand() *cobra.Command {
	return &cobra.Command{
		Use:   "discovery",
		Short: "Start Istio proxy discovery service.",
		// ...
		RunE: func(c *cobra.Command, args []string) error {
			// ...
			// 创建xDS服务器
			discoveryServer, err := bootstrap.NewServer(serverArgs)
			if err != nil {
				return fmt.Errorf("failed to create discovery service: %v", err)
			}

			// 启动xDS服务器
			if err := discoveryServer.Start(stop); err != nil {
				return fmt.Errorf("failed to start discovery service: %v", err)
			}

			cmd.WaitSignal(stop)
			// Wait until we shut down. In theory this could block forever; in practice we will get
			// forcibly shut down after 30s in Kubernetes.
			discoveryServer.WaitUntilCompletion()
			return nil
		},
	}
}

其中bootstrap.NewServer创建服务发现服务器数据结构,

// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
	// ...
	ac := aggregate.NewController(aggregate.Options{
		MeshHolder: e,
	})
	e.ServiceDiscovery = ac

	s := &Server{
		// ...
		fileWatcher:             filewatcher.NewWatcher(),
		// ...
		server:                  server.New(),
		// ...
	}
	// ...
	
	// 创建DiscoveryServer
	// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
	s.XDSServer = xds.NewDiscoveryServer(e, args.Plugins, args.PodName, args.Namespace, args.RegistryOptions.KubeOptions.ClusterAliases)

	// ...

	// 初始化各种控制器
	if err := s.initControllers(args); err != nil {
		return nil, err
	}

	// ...

	// This should be called only after controllers are initialized.
	// 初始化注册事件处理器,在其中注册了SE与配置变更时全量更新的处理器;
	// 添加了ServiceEntry类型资源的配置更新处理函数,即当外部资源配置发生更改时的响应处理流程。
	s.initRegistryEventHandlers()

	// 初始化发现服务,添加了xDS Server启动函数
	s.initDiscoveryService(args)

	s.initSDSServer()

	// ...

	return s, nil
}

xDS指的是Envoy里定义的一系列服务发现协议,如cds, eds, lds, rds等,

配置控制器与服务控制器对各类资源的监控实际上实现了K8S的List/Watch机制

// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {
	s.initMulticluster(args)
	// ...
	// 配置相关
	if err := s.initConfigController(args); err != nil {
		return fmt.Errorf("error initializing config controller: %v", err)
	}
	if err := s.initServiceControllers(args); err != nil {
		return fmt.Errorf("error initializing service controllers: %v", err)
	}
	return nil
}

初始化Config控制器

// initConfigController creates the config controller in the pilotConfig.
// 监控istio的各类资源,如VirtualService/DestinationRule/ServiceEntry/Gateway等的创建与更新
func (s *Server) initConfigController(args *PilotArgs) error {
	s.initStatusController(args, features.EnableStatus)
	meshConfig := s.environment.Mesh()
	if len(meshConfig.ConfigSources) > 0 {
		// Using MCP for config.
		if err := s.initConfigSources(args); err != nil {
			return err
		}
	} else if args.RegistryOptions.FileDir != "" {
		// Local files - should be added even if other options are specified
		store := memory.Make(collections.Pilot)
		configController := memory.NewController(store)

		err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
		// ...
	} else {
		err2 := s.initK8SConfigStore(args)
		// ...
	}

	// If running in ingress mode (requires k8s), wrap the config controller.
	if hasKubeRegistry(args.RegistryOptions.Registries) && meshConfig.IngressControllerMode != meshconfig.MeshConfig_OFF {
		// ...
	}

	// Wrap the config controller with a cache.
	aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
	// ...
	s.configController = aggregateConfigController

	// Create the config store.
	s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)

	// Defer starting the controller until after the service is created.
	s.addStartFunc(func(stop <-chan struct{}) error {
		go s.configController.Run(stop)
		return nil
	})

	return nil
}

初始化Service控制器

Pilot中跟服务发现的相关的逻辑在ServiceController中,处理来自例如kubernetes注册中心的服务变更信息,用于监控k8s中各类服务资源,如Pod/Service等的创建与更新;

// initServiceControllers creates and initializes the service controllers
func (s *Server) initServiceControllers(args *PilotArgs) error {
	serviceControllers := s.ServiceController()

	s.serviceEntryStore = serviceentry.NewServiceDiscovery(
		s.configController, s.environment.IstioConfigStore, s.XDSServer,
		serviceentry.WithClusterID(s.clusterID),
	)
	serviceControllers.AddRegistry(s.serviceEntryStore)

	registered := make(map[provider.ID]bool)
	for _, r := range args.RegistryOptions.Registries {
		serviceRegistry := provider.ID(r)
		if _, exists := registered[serviceRegistry]; exists {
			log.Warnf("%s registry specified multiple times.", r)
			continue
		}
		registered[serviceRegistry] = true
		log.Infof("Adding %s registry adapter", serviceRegistry)
		switch serviceRegistry {
		case provider.Kubernetes:
		    // 初始化K8S服务控制器
			if err := s.initKubeRegistry(args); err != nil {
				return err
			}
		case provider.Mock:
			s.initMockRegistry()
		default:
			return fmt.Errorf("service registry %s is not supported", r)
		}
	}

	// Defer running of the service controllers.
	s.addStartFunc(func(stop <-chan struct{}) error {
		go serviceControllers.Run(stop)
		return nil
	})

	return nil
}

controller初始化

NewController方法里面首先是初始化Controller,然后获取informer和lister后分别注册Namespaces Handler、Services Handler、Nodes Handler、Pods Handler。

核心功能就是监听k8s相关资源(Namespaces、Service、Pod、Node)的更新事件,执行相应的事件处理回调函数。

// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see multicluster.Controller).
func NewController(kubeClient kubelib.Client, options Options) *Controller {
	c := &Controller{
		opts:                       options,
		client:                     kubeClient,
		// 控制器任务队列
		queue:                      queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
		// ...
		multinetwork: initMultinetwork(),
	}
	// ...
	c.nsInformer = kubeClient.KubeInformer().Core().V1().Namespaces().Informer()
	c.nsLister = kubeClient.KubeInformer().Core().V1().Namespaces().Lister()
	if c.opts.SystemNamespace != "" {
		// ...
		c.registerHandlers(nsInformer, "Namespaces", c.onSystemNamespaceEvent, nil)
	}
	
	// ...
	c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)

	switch options.EndpointMode {
	case EndpointsOnly:
		c.endpoints = newEndpointsController(c)
	case EndpointSliceOnly:
		c.endpoints = newEndpointSliceController(c)
	}

	// ...
	c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil)

	// ...
	c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil)

	c.exports = newServiceExportCache(c)
	c.imports = newServiceImportCache(c)

	return c
}

上边的Controller实现了接口:

type Controller interface {
	// AppendServiceHandler notifies about changes to the service catalog.
	AppendServiceHandler(f func(*Service, Event))

	// AppendWorkloadHandler notifies about changes to workloads. This differs from InstanceHandler,
	// which deals with service instances (the result of a merge of Service and Workload)
	AppendWorkloadHandler(f func(*WorkloadInstance, Event))

	// Run until a signal is received
	Run(stop <-chan struct{})

	// HasSynced returns true after initial cache synchronization is complete
	HasSynced() bool
}

ServiceController为四种资源分别创建了一个监听器,用于监听K8s的资源更新,并注册EventHandler。

Namespace

func (c *Controller) onSystemNamespaceEvent(obj interface{}, ev model.Event) error {
	// ...
	// network changed, rarely happen
	if oldDefaultNetwork != c.network {
		// refresh pods/endpoints/services
		c.onDefaultNetworkChange()
	}
	return nil
}

Service

Service事件处理器会将根据事件的类型更新缓存,然后调用serviceHandlers的事件处理器进行回调。

func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
	svc, err := convertToService(curr)
	if err != nil {
		log.Errorf(err)
		return nil
	}

	log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)

	// Create the standard (cluster.local) service.
	svcConv := kube.ConvertService(*svc, c.opts.DomainSuffix, c.Cluster())
	switch event {
	case model.EventDelete:
		c.deleteService(svcConv)
	default:
		c.addOrUpdateService(svc, svcConv, event, false)
	}

	return nil
}

Node

func (c *Controller) onNodeEvent(obj interface{}, event model.Event) error {
	// ...
	// update all related services
	if updatedNeeded && c.updateServiceNodePortAddresses() {
		c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
			Full:   true,
			Reason: []model.TriggerReason{model.ServiceUpdate},
		})
	}
	return nil
}

Pod

// onEvent updates the IP-based index (pc.podsByIP).
func (pc *PodCache) onEvent(curr interface{}, ev model.Event) error {
	// ...
	switch ev {
	case model.EventAdd:
		// can happen when istiod just starts
		if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
			return nil
		} else if shouldPodBeInEndpoints(pod) {
			pc.update(ip, key)
		} else {
			return nil
		}
	case model.EventUpdate:
		if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
			// delete only if this pod was in the cache
			pc.deleteIP(ip, key)
			ev = model.EventDelete
		} else if shouldPodBeInEndpoints(pod) {
			pc.update(ip, key)
		} else {
			return nil
		}
	case model.EventDelete:
		// delete only if this pod was in the cache,
		// in most case it has already been deleted in `UPDATE` with `DeletionTimestamp` set.
		if !pc.deleteIP(ip, key) {
			return nil
		}
	}
	pc.notifyWorkloadHandlers(pod, ev)
	return nil
}

Pilot Discovery各组件启动流程

Pilot Discovery各组件启动流程

DiscoveryServer接收Envoy的gRPC连接请求流程

在这里插入图片描述

Config变化后向Envoy推送更新的流程

Config变化后向Envoy推送更新的流程

总结

Pilot-Discovery服务发现是通过k8s的Informer和Lister来注册监听Namespace、Service、nodes、pods等资源的更新事件,然后通过事件驱动模型执行回调函数,再调用xDS的ConfigUpdate来执行异步更新配置的操作。

参考

Istio Pilot代码深度解析


版权声明:本文为xidianjiapei001原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。