Dubbo 的集群容错和负载均衡
线上的服务一般是要集群部署的,集群的话就要考虑消费者选择哪一个提供者调用,调用失败了怎么办。这两点对应的就是负载均衡和集群容错了。
invoker
在 Dubbo 中 invoker 其实就是一个具有调用功能的对象,在服务暴露端封装的就是真实的服务实现,把真实的服务实现封装一下变成一个 invoker。
在服务发现端就是从注册中心得到服务提供者的配置信息,然后一条配置信息对应封装成一个 invoker,这个 invoker 就具备远程调用能力,当然要是走的是 injvm 协议那真实走的还是本地的调用。
然后还有个 ClusterInvoker ,它也是个 invoker ,它封装了服务引入生成的 invoker 们,赋予其集群容错等能力,这个 invoker 就是暴露给消费者调用的 invoker。
所以说 Dubbo 就是搞了个统一模型,将能调用的服务的对象都封装成 invoker。
这里主要讲的是服务消费者这边的事情,因为集群容错是消费者端实现的。
服务目录
服务目录也就是 Directory,可以理解为服务的目录,但实际上它是一堆 invoker 的集合。
服务的提供者都会集群部署,所有同样的服务一般都会有多个提供者,服务目录就负责管理这些服务提供者,当需要选择服务提供者时,就直接在服务目录中通过负载均衡算法挑选出一个即可。
而服务提供者们也不是一成不变的,比如集群中增加了一台服务提供者,那么相应的服务目录就需要添加一个 invoker,下线了一台服务提供者,目录里面也需要删除对应的 invoker,修改了配置也一样得更新。
所以这个服务目录其实还实现了监听注册中心的功能(指的是 RegistryDirectory )。

使用一个抽象类来实现 Directory 接口,抽象类会实现一些公共方法,并且定义好逻辑,然后具体的实现由子类来完成,可以看到有两个子类,分别是 StaticDirectory 和 RegistryDirectory。
RegistryDirectory
RegistryDirectory ,是一个动态目录。它实现了 NotifyListener 接口,所以它可以监听注册中心的变化,当服务中心的配置发生变化之后, RegistryDirectory 就可以收到变更通知,然后根据配置刷新其 Invoker 列表。
RegistryDirectory 一共有三大作用:
获取 invoker 列表:RegistryDirectory 实现的父类抽象方法 doList,其目的就是得到 invoker 列表,而其内部的实现主要是做了层方法名的过滤,通过方法名找到对应的 invokers。
@Override public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(...); } List<Invoker<T>> invokers = null; // 拿到 方法名 和 Invoker 的映射 Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter } if (invokers == null) { invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } if (invokers == null) { Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } // 经过方法名过滤返回 invokers return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; }监听注册中心的变化:通过实现 NotifyListener 接口能感知到注册中心的数据变更,这其实是在服务引入的时候就订阅的。
RegistryDirectory 定义了三种集合,分别是 invokerUrls 、routerUrls 、configuratorUrls 分别处理相应的配置变化,然后对应转化成对象。
@Override public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { // 按要求添加到上面三个 list 中 ... } // configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && !routerUrls.isEmpty()) { List<Router> routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // merge override parameters this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); }刷新 invokers:其实就是根据监听变更的 invokerUrls 做一波操作,
refreshInvoker(invokerUrls), 根据配置更新 invokers。private void refreshInvoker(List<URL> invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { // 如果只有一个 invokerUrl 并且协议是 empty,则清楚所有的 invoker this.forbidden = true; // Forbid to access this.methodInvokerMap = null; // Set the method invoker map to null destroyAllInvokers(); // Close all invokers } else { // 根据 URL 生成 invoker map,然后方法名对应 invoker 的 map,再销毁无效的 invoker ... Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map ... this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } 先根据 invokerUrls 数量和协议头是否是 empty,来决定是否禁用所有 invokers,如果不禁用,则将 url 转成 Invoker,得到 <url, Invoker> 的映射关系。
然后再进行转换,得到 <方法名, Invoker 列表> 映射关系,再将同一个组的 Invoker 进行合并,并将合并结果赋值给 methodInvokerMap,这个 methodInvokerMap 就是上面 doList 中使用的那个 Map。
所以是在 refreshInvoker 的时候构造 methodInvokerMap,然后在调用的时候再读 methodInvokerMap,最后再销毁无用的 invoker。
StaticDirectory
StaticDirectory,这个是用在多注册中心的时候,它是一个静态目录,即固定的不会增减的,所有 Invoker 是通过构造器来传入。
可以简单的理解成在单注册中心下我们配置的一条 reference 可能对应有多个 provider,然后生成多个 invoker,我们将它们存入 RegistryDirectory 中进行管理,为了便于调用再对外只暴露出一个 invoker 来封装内部的多 invoker 情况。
那多个注册中心就会有多个已经封装好了的 invoker ,这又面临了选择了,于是我们用 StaticDirectory 再来存入这些 invoker 进行管理,也再封装起来对外只暴露出一个 invoker 便于调用。
之所以是静态的是因为多注册中心是写在配置里面的,不像服务可以动态变更。
StaticDirectory 的内部逻辑非常的简单,就是一个 list 存储了这些 invokers,然后实现父类的方法也就单纯的返回这个 list 不做任何操作。
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
return invokers;
}
服务路由
服务路由其实就是路由规则,它规定了服务消费者可以调用哪些服务提供者,Dubbo 一共有三种路由分别是:条件路由 ConditionRouter、脚本路由 ScriptRouter 和标签路由 TagRouter。
条件路由
条件路由是两个条件组成的,是这么个格式 [服务消费者匹配条件] => [服务提供者匹配条件],举个例子官网的例子就是 host = 10.20.153.10 => host = 10.20.153.11。该条规则表示 IP 为 10.20.153.10 的服务消费者只可调用 IP 为 10.20.153.11 机器上的服务,不可调用其他机器上的服务。
路由的配置一样是通过 RegistryDirectory 的 notify 更新和构造的,然后路由的调用在是刷新 invoker 的时候,具体是在调用 toMethodInvokers 的时候会进行服务级别的路由和方法级别的路由。
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
...
// 服务级别的路由过滤
List<Invoker<T>> newInvokersList = route(invokersList, null);
newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
if (serviceMethods != null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.isEmpty()) {
methodInvokers = newInvokersList;
}
// 方法级别的路由过滤
newMethodInvokerMap.put(method, route(methodInvokers, method));
}
}
...
}
Dubbo 的 Cluster
有了服务目录,并且目录还经过了路由规则的过滤,此时我们能拿到一堆 invokers,然后 Cluster 会把这一堆 invoker 封装成 clusterInovker,给到消费者调用的就只有一个 invoker 了。
然后在这个 clusterInovker 内部还能做各种操作,比如选择一个 invoker ,调用出错了可以换一个等等。
这些细节都被封装了,消费者感受不到这个复杂度,所以 cluster 就是一个中间层,为消费者屏蔽了服务提供者的情况,简化了消费者的使用。并且也更加方便的替换各种集群容错措施。

每个 Cluster 内部其实返回的都是 XXXClusterInvoker。
AbstractClusterInvoker
这其实是它们的父类,不过 AvailableCluster 内部就是返回 AbstractClusterInvoker,这个主要用在多注册中心的时候。
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 循环哪个能用就返回哪个
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}
}
FailoverClusterInvoker
FailoverClusterInvoker 实现的是失败自动切换功能,简单的说一个远程调用失败,它就立马换另一个,当然是有重试次数的。
doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,会 catch 住异常,然后失败后进行重试。
每次循环会通过负载均衡选择一个 Invoker,然后通过这个 Invoker 进行远程调用,如果失败了会记录下异常,并进行重试。
这个 select 实际上还进行了粘性处理,也就是会记录上一次选择的 invoker ,这样使得每次调用不会一直换invoker,如果上一次没有 invoker,或者上一次的 invoker 下线了则会进行负载均衡选择。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 获取重试次数,并 + 1。应该不能算第一次
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 开始循环
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
// 负载均衡调用一个 invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 记录这次的选择,下次重试时进行过滤
invoked.add(invoker);
// 记录到上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 发起调用
Result result = invoker.invoke(invocation);
...
return result;
}
// catch 住异常
catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(...);
}
FailfastClusterInvoker
这个 cluster 只会进行一次远程调用,如果失败后立即抛出异常,也就是快速失败,它适合于不支持幂等的一些调用。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// 发起调用
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(...); // 抛错
}
}
FailsafeClusterInvoker
这个 cluster 是一种失败安全的 cluster,也就是调用出错仅仅是用日志记录一下,然后返回了一个空结果,适用于写入审计日志等操作。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
// 打印错误
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
// 返回一个空结果
return new RpcResult(); // ignore
}
}
FailbackClusterInvoker
这个 cluster 会在调用失败后,记录下来这次调用,然后返回一个空结果给服务消费者,并且会通过定时任务对失败的调用进行重调。适合执行消息通知等最大努力场景。
代码逻辑是当调用出错的时候就返回空结果,并且加入到 failed 中,并且会有一个定时任务会定时的去调用 failed里面的调用,如果调用成功就从 failed 中移除这个调用。
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error(...);
// 添加到错误 map,后台会起一个定时任务去操作这个 map
addFailed(invocation, this);
return new RpcResult(); // ignore
}
}
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
// 如果没有这个任务,就起一个定时任务
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// collect retry statistics
try {
retryFailed();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
// 加入到这个 map 中
failed.put(invocation, router);
}
void retryFailed() {
if (failed.size() == 0) {
return;
}
// 循环 map 发起调用,成功就 remove 这个调用 invocation
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
invoker.invoke(invocation);
failed.remove(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
}
}
}
ForkingClusterInvoker
这个 cluster 会在运行时把所有 invoker 都通过线程池进行并发调用,只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。适合用在对实时性要求比较高的读操作。
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
...
for (final Invoker<T> invoker : selected) {
// 选择好的 invoker 都扔到线程池中
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
// 等所有调用出错才将错误入队
ref.offer(e);
}
}
}
});
}
try {
// 阻塞获取结果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
// 如果是错误就抛出
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(...);
}
// 返回正确结果
return (Result) ret;
} catch (InterruptedException e) {
...
}
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
BroadcastClusterInvoker
这个 cluster 会在运行时把所有 invoker 逐个调用,然后在最后判断如果有一个调用抛错的话,就抛出异常。适合通知所有提供者更新缓存或日志等本地资源信息的场景。
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
// 循环一个一个地调用
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
if (exception != null) {
// 如果有一个出错就抛错
throw exception;
}
return result;
}
Dubbo 中的负载均衡
Dubbo 也有自己的负载均衡,即 LoadBalance,前面我们提到服务提供者一般都是集群部署,这 cluster 虽然暴露出一个 invoker 给消费者调用,但是真的调用给到它的时候,要根据负载均衡算法决定具体是要调用哪一个服务提供者。

LoadBalance 接口
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
AbstractLoadBalance 抽象类
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
// ww 小于 1 取 1,否则取 ww 和 weight 中的较小值
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 检查数组
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)
return invokers.get(0);
// 子类实现
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
/**
* 计算权重
* 服务预热:当服务刚启动的时候不能一下次让它负载过高,得让它慢慢热身,再加上负载。
* 所以这个方法会判断服务运行的时间,来进行服务的降权,这是一个优化手段。
*/
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 获取权重
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
// 得到启动时间
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 计算启动到现在的时间
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
// 降低权重
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
负载均衡的实现类都继承于这个类,该类实现了 LoadBalance 接口,并封装了一些公共的逻辑,同样还是模板方法。
Dubbo 2.6
AbstractLoadBalance.javaConsistentHashLoadBalance.javaLeastActiveLoadBalance.javaRandomLoadBalance.javaRoundRobinLoadBalance.java
Dubbo 3.0
AbstractLoadBalance.javaConsistentHashLoadBalance.javaLeastActiveLoadBalance.javaRandomLoadBalance.javaRoundRobinLoadBalance.javaShortestResponseLoadBalance.java
1)RandomLoadBalance 加权随机
加权随机算法是指,以 Invoker 的权重之和为范围产生一个随机数,选取随机数落在其权重所指范围内的 Invoker,意味着谁的权重大,谁匹配到随机数的概率就越高。这是Dubbo 默认采取的负载均衡实现。
代码实现步骤:
- 遍历 Invokes 数组,统计数组中所有 Invoke 的总权重,并标记是否所有 Invoke 的权重都相等。
- 如果所有 Invoke 的权重都相等,则随机选择其中一个。
- 否则根据随机数落入的位置判断。
Dubbo 2.6
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
// 统计 totalWeight 和 更新 sameWeight
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// 获取随机数
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
// 计算随机数掉落在哪个 Invoke 的权重范围
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 均等概率随机
return invokers.get(random.nextInt(length));
}
Dubbo 3.0
使用
ThreadLocalRandom替代Random Random 是通过 CAS 和自旋的方式生成随机数,在多线程模式下同一时刻只能有一个线程通过 CAS 获取到新种子并生成随机数,其他线程只能自旋等待,所以有一定的性能损耗。
因此,虽然Random是线程安全的,但是并不是“高并发”的。
ThreadLocalRandom 继承自 Random,根据里氏代换原则,这说明 ThreadLocalRandom 提供了和 Random 相同的随机数生成功能,只是实现算法略有不同。
而在 JDK 1.7 时新增了 ThreadLocalRandom 它的种子保存在各自的线程中,因此不会有自旋等待的过程,所以高并发情况下性能更优秀。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// 一些判空处理
if (!needWeightLoadBalance(invokers,invocation)){
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
// Every invoker has the same weight?
boolean sameWeight = true;
// the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
int[] weights = new int[length];
// The sum of weights
int totalWeight = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// Sum
totalWeight += weight;
// save for later use
weights[i] = totalWeight;
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
2)RoundRobinLoadBalance 加权轮询
Dubbo 是参考 Nginx 做的平滑加权轮询。比如 A、B、A、A、B、A…,简单的说就是打乱顺序的轮询。
代码实现比较复杂,就不贴出来了。
假设有 N 台实例 S = {S1, S2, …, Sn},配置权重 W = {W1, W2, …, Wn},有效权重 CW = {CW1, CW2, …, CWn}。每个实例 i 除了存在一个配置权重 Wi 外,还存在一个当前有效权重 CWi,且 CWi 初始化为 Wi;指示变量 currentPos 表示当前选择的实例 ID,初始化为 -1;所有实例的配置权重和为 weightSum;
那么,调度算法可以描述为:
1、初始每个实例 i 的 当前有效权重 CWi 为 配置权重 Wi,并求得配置权重和 weightSum;
2、选出 当前有效权重最大 的实例,将 当前有效权重 CWi 减去所有实例的 权重和 weightSum,且变量 currentPos 指向此位置;
3、将每个实例 i 的 当前有效权重 CWi 都加上 配置权重 Wi;
4、此时变量 currentPos 指向的实例就是需调度的实例;
5、每次调度重复上述步骤 2、3、4;
| 请求 | 选中前的当前权重 | currentPos | 选中的实例 | 选中后的当前权重 |
|---|---|---|---|---|
| 1 | {5, 1, 1} | 0 | 192.168.10.1:2202 | {-2, 1, 1} |
| 2 | {3, 2, 2} | 0 | 192.168.10.1:2202 | {-4, 2, 2} |
| 3 | {1, 3, 3} | 1 | 192.168.10.2:2202 | {1, -4, 3} |
| 4 | {6, -3, 4} | 0 | 192.168.10.1:2202 | {-1, -3, 4} |
| 5 | {4, -2, 5} | 2 | 192.168.10.3:2202 | {4, -2, -2} |
| 6 | {9, -1, -1} | 0 | 192.168.10.1:2202 | {2, -1, -1} |
| 7 | {7, 0, 0} | 0 | 192.168.10.1:2202 | {0, 0, 0} |
| 8 | {5, 1, 1} | 0 | 192.168.10.1:2202 | {-2, 1, 1} |
模拟:(配置权重: {5, 1, 1},初始权重:{0, 0, 0})
- 请求一:
- 选中前: 当前权重{0, 0, 0} 加上配置权重 {5, 1, 1},得到 {5, 1, 1};
- 得到最大权重值对应的下标 currentPos = 0;
- 选中后: 下标 0 的权重减去总权重(sum{5, 1, 1} = 7),得到 {-2, 1, 1};
3)LeastActiveLoadBalance 最少活跃数负载均衡
最少活跃数负载均衡算法是指,选择当前活跃调用数最少的 Invoker 进行调用.如果所有 Invoker 的活跃调用数都相等,则通过权重来判断,这个权重其实就和 RandomLoadBalance 的实现一样了。
活跃的调用数少说明它现在很轻松,而且活跃数都是从 0 加起来的,来一个请求活跃数+1,一个请求处理完成活跃数-1,所以活跃数少也能变相的体现处理的快。
代码实现步骤:
- 先遍历 Invokers 数组,寻找活跃数最小的 Invoker
- 如果有多个 Invoker 具有相同的最小活跃数,则记录这些 Invoker 的下标,并累加它们的权重来进行权重选择。
Dubbo 2.6
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of with warmup weights
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取活跃调用数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 获取权重
int afterWarmup = getWeight(invoker, invocation);
// 遇到更小活跃调用数的 invoker 时进行更新
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexs[0] = i; // Reset
totalWeight = afterWarmup; // Reset
firstWeight = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
// 统计最小活跃调用的 invoker 的下标
leastIndexs[leastCount++] = i; // Record index number of this invoker
totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
// 只有一个最小活跃调用的 invoker,则选择它
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexs[0]);
}
// 若有多个最小活跃调用的 invoker,则根据它们的权重进行选择
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offsetWeight = random.nextInt(totalWeight) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
Dubbo 3.0
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokers
int totalWeight = 0;
// The weight of the first least active invoker
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoker's configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexes
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexes order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
4) ConsistentHashLoadBalance 一致性哈希
常见的一致性 Hash 算法是 Karger 提出的,就是将 hash值空间设为 [0, 2^32 - 1],并且是个循环的圆环状。
将服务器的 IP 等信息生成一个 hash 值,将这个值投射到圆环上作为一个节点,然后当 key 来查找的时候顺时针查找第一个大于等于这个 key 的 hash 值的节点。
一般而言还会引入虚拟节点,使得数据更加的分散,避免数据倾斜压垮某个节点,来看下官网的一个图。

整体的实现也不难,就是上面所说的那个逻辑,而圆环这是利用 treeMap 来实现的,通过 tailMap 来查找大于等于的第一个 invoker,如果没找到说明要拿第一个,直接赋值 treeMap 的 firstEntry。
然后 Dubbo 默认搞了 160 个虚拟节点,整体的 hash 是方法级别的,即一个 service 的每个方法有一个 ConsistentHashSelector,并且是根据参数值来进行 hash的,也就是说负载均衡逻辑只受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。
5)ShortestResponseLoadBalance
2020年05月15日,Dubbo 2.7.7 release 开始新增这个负载均衡算法:[Github 的提交记录:chickenlj 刘军](