排除netflix-ribbon
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.1</version>
<exclusions>
<exclusion>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
<groupId>org.springframework.cloud</groupId>
</exclusion>
</exclusions>
</dependency>pom引入包spring-cloud-loadbalancer
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>新增全局过滤器,继承类ReactiveLoadBalancerClientFilter
@Component
@Slf4j
public class GrayLoadBalancerFilter extends ReactiveLoadBalancerClientFilter {
private final static Map<String, ReactorLoadBalancer<ServiceInstance>> BALANCERS = new ConcurrentHashMap<>();
private final LoadBalancerClientFactory clientFactory;
private LoadBalancerProperties properties;
public GrayLoadBalancerFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
super(clientFactory, properties);
this.clientFactory = clientFactory;
this.properties = properties;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()
+ " url before: " + url);
}
return choose(exchange).doOnNext(response -> {
if (!response.hasServer()) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
ServiceInstance retrievedInstance = response.getServer();
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(
retrievedInstance, overrideScheme);
URI requestUrl = reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
}).then(chain.filter(exchange));
}
//主要改这里,自定义ReactorLoadBalancer
private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String host = uri.getHost();
ReactorLoadBalancer<ServiceInstance> loadBalancer = BALANCERS.get(host);
if (loadBalancer == null) {
loadBalancer = new GrayLoadBalancer(clientFactory.getLazyProvider(host, ServiceInstanceListSupplier.class), host);
BALANCERS.put(host, loadBalancer);
}
return loadBalancer.choose(createRequest(exchange));
}
private Request createRequest(ServerWebExchange exchange) {
HttpHeaders headers = exchange.getRequest().getHeaders();
return new DefaultRequest<>(headers);
}
}新增类GrayLoadBalancer实现接口 ReactorServiceInstanceLoadBalancer
@Slf4j
public class GrayLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private static final EmptyResponse EMPTY_RESPONSE = new EmptyResponse();
private String serviceId;
private static final Random RANDOM = new Random();
public GrayLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.serviceId = serviceId;
}
@Override
public Mono<Response<ServiceInstance>> choose(org.springframework.cloud.client.loadbalancer.reactive.Request request) {
if (serviceInstanceListSupplierProvider != null) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get().next().map(list -> getInstanceResponse(list, request));
}
return null;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) {
if (instances.isEmpty()) {
return EMPTY_RESPONSE;
}
//根据uid 判断不通版本,可自定义其他方式
HttpHeaders headers = (HttpHeaders) request.getContext();
String uid = headers.getFirst(Constants.UID);
ServiceInstance serviceInstance;
List<ServiceInstance> uidInstances = new ArrayList<>();
List<ServiceInstance> noUidInstances = new ArrayList<>();
for (ServiceInstance instance : instances) {
Map<String, String> metadata = instance.getMetadata();
String mUid = metadata.get(Constants.UID);
if (mUid != null) {
if (mUid.equals(uid)) {
uidInstances.add(instance);
}
} else {
noUidInstances.add(instance);
}
}
if (uidInstances.size() > 0) {
serviceInstance = uidInstances.get(RANDOM.nextInt(uidInstances.size()));
return new DefaultResponse(serviceInstance);
}
if (noUidInstances.size() > 0) {
serviceInstance = noUidInstances.get(RANDOM.nextInt(noUidInstances.size()));
return new DefaultResponse(serviceInstance);
}
return EMPTY_RESPONSE;
}
}版权声明:本文为Vinci_ljl原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。