Spring Cloud Zookeeper 升级为Spring Cloud Kubernetes

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

现有的微服务是使用的Spring Cloud Zookeeper这一套,实际应用在Kubernetes中部署并不需要额外的注册中心,本身Kubernetes自己就支持,所以打算替换到Zookeeper 替换为Spring Cloud Kubernetes

替换

1. 删除Spring Cloud Zookeeper相关依赖

	<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
    </dependency>

2. 添加 Spring Cloud Kubernetes 相关依赖

	<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-kubernetes-client-all</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-kubernetes-client-loadbalancer</artifactId>
    </dependency>

版本我这里使用的最新版本,2.1.4

3. 解决port没有命名的bug

由于最新版本有bug,就是service.yaml中如果没有定义port的name会报错,所以这里我们采用修改源码方式去解决

问题详情可以参考我之前发的博文

直接创建一个包名为:org.springframework.cloud.kubernetes.client.discovery
创建类KubernetesInformerDiscoveryClient 代码如下

package org.springframework.cloud.kubernetes.client.discovery;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.kubernetes.client.extended.wait.Wait;
import io.kubernetes.client.informer.SharedInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.openapi.models.V1EndpointAddress;
import io.kubernetes.client.openapi.models.V1EndpointPort;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesServiceInstance;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
 *@author : wh
 *@date : 2022/10/27 14:36
 *@description:
 */
public class KubernetesInformerDiscoveryClient implements DiscoveryClient, InitializingBean {

	private static final Log log = LogFactory.getLog(KubernetesInformerDiscoveryClient.class);

	private static final String PRIMARY_PORT_NAME_LABEL_KEY = "primary-port-name";

	private static final String HTTPS_PORT_NAME = "https";

	private static final String UNSET_PORT_NAME = "<unset>";

	private static final String HTTP_PORT_NAME = "http";

	private final SharedInformerFactory sharedInformerFactory;

	private final Lister<V1Service> serviceLister;

	private final Supplier<Boolean> informersReadyFunc;

	private final Lister<V1Endpoints> endpointsLister;

	private final KubernetesDiscoveryProperties properties;

	private final String namespace;

	public KubernetesInformerDiscoveryClient(String namespace, SharedInformerFactory sharedInformerFactory,
			Lister<V1Service> serviceLister, Lister<V1Endpoints> endpointsLister,
			SharedInformer<V1Service> serviceInformer, SharedInformer<V1Endpoints> endpointsInformer,
			KubernetesDiscoveryProperties properties) {
		this.namespace = namespace;
		this.sharedInformerFactory = sharedInformerFactory;

		this.serviceLister = serviceLister;
		this.endpointsLister = endpointsLister;
		this.informersReadyFunc = () -> serviceInformer.hasSynced() && endpointsInformer.hasSynced();

		this.properties = properties;
	}

	@Override
	public String description() {
		return "Kubernetes Client Discovery";
	}

	@Override
	public List<ServiceInstance> getInstances(String serviceId) {
		Assert.notNull(serviceId, "[Assertion failed] - the object argument must not be null");

		if (!StringUtils.hasText(namespace) && !properties.isAllNamespaces()) {
			log.warn("Namespace is null or empty, this may cause issues looking up services");
		}

		V1Service service = properties.isAllNamespaces() ? this.serviceLister.list().stream()
				.filter(svc -> serviceId.equals(svc.getMetadata().getName())).findFirst().orElse(null)
				: this.serviceLister.namespace(this.namespace).get(serviceId);
		if (service == null || !matchServiceLabels(service)) {
			// no such service present in the cluster
			return new ArrayList<>();
		}

		Map<String, String> svcMetadata = new HashMap<>();
		if (this.properties.getMetadata() != null) {
			if (this.properties.getMetadata().isAddLabels()) {
				if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
					String labelPrefix = this.properties.getMetadata().getLabelsPrefix() != null
							? this.properties.getMetadata().getLabelsPrefix() : "";
					service.getMetadata().getLabels().entrySet().stream()
							.filter(e -> e.getKey().startsWith(labelPrefix))
							.forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));
				}
			}
			if (this.properties.getMetadata().isAddAnnotations()) {
				if (service.getMetadata() != null && service.getMetadata().getAnnotations() != null) {
					String annotationPrefix = this.properties.getMetadata().getAnnotationsPrefix() != null
							? this.properties.getMetadata().getAnnotationsPrefix() : "";
					service.getMetadata().getAnnotations().entrySet().stream()
							.filter(e -> e.getKey().startsWith(annotationPrefix))
							.forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));
				}
			}
		}

		V1Endpoints ep = this.endpointsLister.namespace(service.getMetadata().getNamespace())
				.get(service.getMetadata().getName());
		if (ep == null || ep.getSubsets() == null) {
			// no available endpoints in the cluster
			return new ArrayList<>();
		}

		Optional<String> discoveredPrimaryPortName = Optional.empty();
		if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
			discoveredPrimaryPortName = Optional
					.ofNullable(service.getMetadata().getLabels().get(PRIMARY_PORT_NAME_LABEL_KEY));
		}
		final String primaryPortName = discoveredPrimaryPortName.orElse(this.properties.getPrimaryPortName());

		return ep.getSubsets().stream().filter(subset -> subset.getPorts() != null && subset.getPorts().size() > 0) // safeguard
				.flatMap(subset -> {
					Map<String, String> metadata = new HashMap<>(svcMetadata);
					List<V1EndpointPort> endpointPorts = subset.getPorts();
					if (this.properties.getMetadata() != null && this.properties.getMetadata().isAddPorts()) {
						endpointPorts.forEach(p -> metadata.put(StringUtils.hasText(p.getName()) ? p.getName() : UNSET_PORT_NAME,
								Integer.toString(p.getPort())));
					}
					List<V1EndpointAddress> addresses = subset.getAddresses();
					if (addresses == null) {
						addresses = new ArrayList<>();
					}
					if (this.properties.isIncludeNotReadyAddresses()
							&& !CollectionUtils.isEmpty(subset.getNotReadyAddresses())) {
						addresses.addAll(subset.getNotReadyAddresses());
					}

					final int port = findEndpointPort(endpointPorts, primaryPortName, serviceId);
					return addresses.stream()
							.map(addr -> new KubernetesServiceInstance(
									addr.getTargetRef() != null ? addr.getTargetRef().getUid() : "", serviceId,
									addr.getIp(), port, metadata, false, service.getMetadata().getNamespace(),
									service.getMetadata().getClusterName()));
				}).collect(Collectors.toList());
	}

	private int findEndpointPort(List<V1EndpointPort> endpointPorts, String primaryPortName, String serviceId) {
		if (endpointPorts.size() == 1) {
			return endpointPorts.get(0).getPort();
		}
		else {
			Map<String, Integer> ports = endpointPorts.stream().filter(p -> StringUtils.hasText(p.getName()))
					.collect(Collectors.toMap(V1EndpointPort::getName, V1EndpointPort::getPort));
			// This oneliner is looking for a port with a name equal to the primary port
			// name specified in the service label
			// or in spring.cloud.kubernetes.discovery.primary-port-name, equal to https,
			// or equal to http.
			// In case no port has been found return -1 to log a warning and fall back to
			// the first port in the list.
			int discoveredPort = ports.getOrDefault(primaryPortName,
					ports.getOrDefault(HTTPS_PORT_NAME, ports.getOrDefault(HTTP_PORT_NAME, -1)));

			if (discoveredPort == -1) {
				if (StringUtils.hasText(primaryPortName)) {
					log.warn("Could not find a port named '" + primaryPortName + "', 'https', or 'http' for service '"
							+ serviceId + "'.");
				}
				else {
					log.warn("Could not find a port named 'https' or 'http' for service '" + serviceId + "'.");
				}
				log.warn(
						"Make sure that either the primary-port-name label has been added to the service, or that spring.cloud.kubernetes.discovery.primary-port-name has been configured.");
				log.warn("Alternatively name the primary port 'https' or 'http'");
				log.warn("An incorrect configuration may result in non-deterministic behaviour.");
				discoveredPort = endpointPorts.get(0).getPort();
			}
			return discoveredPort;
		}
	}

	@Override
	public List<String> getServices() {
		List<V1Service> services = this.properties.isAllNamespaces() ? this.serviceLister.list()
				: this.serviceLister.namespace(this.namespace).list();
		return services.stream().filter(this::matchServiceLabels).map(s -> s.getMetadata().getName())
				.collect(Collectors.toList());
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		this.sharedInformerFactory.startAllRegisteredInformers();
		if (!Wait.poll(Duration.ofSeconds(1), Duration.ofSeconds(this.properties.getCacheLoadingTimeoutSeconds()),
				() -> {
					log.info("Waiting for the cache of informers to be fully loaded..");
					return this.informersReadyFunc.get();
				})) {
			if (this.properties.isWaitCacheReady()) {
				throw new IllegalStateException(
						"Timeout waiting for informers cache to be ready, is the kubernetes service up?");
			}
			else {
				log.warn(
						"Timeout waiting for informers cache to be ready, ignoring the failure because waitForInformerCacheReady property is false");
			}
		}
		log.info("Cache fully loaded (total " + serviceLister.list().size()
				+ " services) , discovery client is now available");
	}

	private boolean matchServiceLabels(V1Service service) {
		if (log.isDebugEnabled()) {
			log.debug("Kubernetes Service Label Properties:");
			if (this.properties.getServiceLabels() != null) {
				this.properties.getServiceLabels().forEach((key, value) -> log.debug(key + ":" + value));
			}
			log.debug("Service " + service.getMetadata().getName() + " labels:");
			if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {
				service.getMetadata().getLabels().forEach((key, value) -> log.debug(key + ":" + value));
			}
		}
		// safeguard
		if (service.getMetadata() == null) {
			return false;
		}
		if (properties.getServiceLabels() == null || properties.getServiceLabels().isEmpty()) {
			return true;
		}
		return properties.getServiceLabels().keySet().stream()
				.allMatch(k -> service.getMetadata().getLabels() != null
						&& service.getMetadata().getLabels().containsKey(k)
						&& service.getMetadata().getLabels().get(k).equals(properties.getServiceLabels().get(k)));
	}

}

4. 修改路由转发

				builder.routes()
                .route(r -> r.path("/ms/test/**")
                        .filters(f -> f.stripPrefix(1))
                        .uri("lb://test-service"))

原先使用的是zookeeper上的服务名进行转发的,如果pod上面的服务名和之前zookeeper上面注册的名字不一致,就需要改一下路由的服务名

部署

然后再k8s中重新部署服务

报错

在这里插入图片描述

可以看到这里是pod没有权限调用k8s相关的api,授权就好了。使用RBAC权限处理

  1. 创建role
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: default
  name: pod-reader
rules:
  - apiGroups: [""]
    resources: ["pods","configmaps"]
    verbs: ["get", "watch", "list"]

  1. 创建ServiceAccount
apiVersion: v1
kind: ServiceAccount
metadata:
  name: config-reader
  namespace: default
  1. 绑定Role和ServiceAccount
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: pod-reader
  namespace: default
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: pod-reader
subjects:
  - kind: ServiceAccount
    name: config-reader
    namespace: default
  1. 在deployment中指定上面的ServiceAccount

参考博客

重新部署启动就会发现是无缝切换的

总结

总得来说切换比较简单,基本是无缝的!这样就不用依赖外部的注册中心了


版权声明:本文为qq_42651904原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。