springcloud-gateway 灰度发布V1

排除netflix-ribbon

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2021.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
                    <groupId>org.springframework.cloud</groupId>
                </exclusion>
            </exclusions>
        </dependency>

pom引入包spring-cloud-loadbalancer

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-loadbalancer</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

新增全局过滤器,继承类ReactiveLoadBalancerClientFilter

@Component
@Slf4j
public class GrayLoadBalancerFilter extends ReactiveLoadBalancerClientFilter {
    private final static Map<String, ReactorLoadBalancer<ServiceInstance>> BALANCERS = new ConcurrentHashMap<>();

    private final LoadBalancerClientFactory clientFactory;

    private LoadBalancerProperties properties;

    public GrayLoadBalancerFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
        super(clientFactory, properties);
        this.clientFactory = clientFactory;
        this.properties = properties;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
        if (url == null
                || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
            return chain.filter(exchange);
        }
        // preserve the original url
        addOriginalRequestUrl(exchange, url);

        if (log.isTraceEnabled()) {
            log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()
                    + " url before: " + url);
        }

        return choose(exchange).doOnNext(response -> {

            if (!response.hasServer()) {
                throw NotFoundException.create(properties.isUse404(),
                        "Unable to find instance for " + url.getHost());
            }

            ServiceInstance retrievedInstance = response.getServer();

            URI uri = exchange.getRequest().getURI();

            // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
            // if the loadbalancer doesn't provide one.
            String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
            if (schemePrefix != null) {
                overrideScheme = url.getScheme();
            }

            DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(
                    retrievedInstance, overrideScheme);

            URI requestUrl = reconstructURI(serviceInstance, uri);

            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
            }
            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
        }).then(chain.filter(exchange));
    }

//主要改这里,自定义ReactorLoadBalancer
    private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
        URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String host = uri.getHost();
        ReactorLoadBalancer<ServiceInstance> loadBalancer = BALANCERS.get(host);
        if (loadBalancer == null) {
            loadBalancer = new GrayLoadBalancer(clientFactory.getLazyProvider(host, ServiceInstanceListSupplier.class), host);
            BALANCERS.put(host, loadBalancer);
        }

        return loadBalancer.choose(createRequest(exchange));
    }

    private Request createRequest(ServerWebExchange exchange) {
        HttpHeaders headers = exchange.getRequest().getHeaders();
        return new DefaultRequest<>(headers);
    }

}

新增类GrayLoadBalancer实现接口 ReactorServiceInstanceLoadBalancer

@Slf4j
public class GrayLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    private static final EmptyResponse EMPTY_RESPONSE = new EmptyResponse();
    private String serviceId;
    private static final Random RANDOM = new Random();

    public GrayLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.serviceId = serviceId;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(org.springframework.cloud.client.loadbalancer.reactive.Request request) {
        if (serviceInstanceListSupplierProvider != null) {
            ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
            return supplier.get().next().map(list -> getInstanceResponse(list, request));
        }
        return null;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) {
        if (instances.isEmpty()) {
            return EMPTY_RESPONSE;
        }

        //根据uid 判断不通版本,可自定义其他方式
        HttpHeaders headers = (HttpHeaders) request.getContext();
        String uid = headers.getFirst(Constants.UID);

        ServiceInstance serviceInstance;
        List<ServiceInstance> uidInstances = new ArrayList<>();
        List<ServiceInstance> noUidInstances = new ArrayList<>();
        for (ServiceInstance instance : instances) {
            Map<String, String> metadata = instance.getMetadata();
            String mUid = metadata.get(Constants.UID);
            if (mUid != null) {
                if (mUid.equals(uid)) {
                    uidInstances.add(instance);
                }
            } else {
                noUidInstances.add(instance);
            }
        }

        if (uidInstances.size() > 0) {
            serviceInstance = uidInstances.get(RANDOM.nextInt(uidInstances.size()));
            return new DefaultResponse(serviceInstance);
        }

        if (noUidInstances.size() > 0) {
            serviceInstance = noUidInstances.get(RANDOM.nextInt(noUidInstances.size()));
            return new DefaultResponse(serviceInstance);
        }

        return EMPTY_RESPONSE;
    }
}


版权声明:本文为Vinci_ljl原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。