Dubbo 笔记(六) 集群容错和负载均衡

Dubbo 的集群容错和负载均衡

参考:敖丙:Dubbo 的集群容错和负载均衡

​ 线上的服务一般是要集群部署的,集群的话就要考虑消费者选择哪一个提供者调用调用失败了怎么办。这两点对应的就是负载均衡集群容错了。

invoker

​ 在 Dubboinvoker 其实就是一个具有调用功能的对象,在服务暴露端封装的就是真实的服务实现把真实的服务实现封装一下变成一个 invoker

​ 在服务发现端就是从注册中心得到服务提供者的配置信息,然后一条配置信息对应封装成一个 invoker,这个 invoker 就具备远程调用能力,当然要是走的是 injvm 协议那真实走的还是本地的调用。

​ 然后还有个 ClusterInvoker ,它也是个 invoker ,它封装了服务引入生成的 invoker 们,赋予其集群容错等能力,这个 invoker 就是暴露给消费者调用的 invoker。

​ 所以说 Dubbo 就是搞了个统一模型,将能调用的服务的对象都封装成 invoker

​ 这里主要讲的是服务消费者这边的事情,因为集群容错是消费者端实现的

服务目录

服务目录也就是 Directory,可以理解为服务的目录,但实际上它是一堆 invoker 的集合

服务的提供者都会集群部署,所有同样的服务一般都会有多个提供者,服务目录就负责管理这些服务提供者,当需要选择服务提供者时,就直接在服务目录中通过负载均衡算法挑选出一个即可。

​ 而服务提供者们也不是一成不变的,比如集群中增加了一台服务提供者,那么相应的服务目录就需要添加一个 invoker,下线了一台服务提供者,目录里面也需要删除对应的 invoker,修改了配置也一样得更新。

​ 所以这个服务目录其实还实现了监听注册中心的功能(指的是 RegistryDirectory )。

在这里插入图片描述

​ 使用一个抽象类来实现 Directory 接口,抽象类会实现一些公共方法,并且定义好逻辑,然后具体的实现由子类来完成,可以看到有两个子类,分别是 StaticDirectoryRegistryDirectory

RegistryDirectory

​ RegistryDirectory ,是一个动态目录。它实现了 NotifyListener 接口,所以它可以监听注册中心的变化,当服务中心的配置发生变化之后, RegistryDirectory 就可以收到变更通知,然后根据配置刷新其 Invoker 列表。

RegistryDirectory 一共有三大作用:

  1. 获取 invoker 列表RegistryDirectory 实现的父类抽象方法 doList,其目的就是得到 invoker 列表,而其内部的实现主要是做了层方法名的过滤,通过方法名找到对应的 invokers

    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(...);
        }
        List<Invoker<T>> invokers = null;
        // 拿到 方法名 和 Invoker 的映射
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        // 经过方法名过滤返回 invokers
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }
    
  2. 监听注册中心的变化:通过实现 NotifyListener 接口能感知到注册中心的数据变更,这其实是在服务引入的时候就订阅的。

    RegistryDirectory 定义了三种集合,分别是 invokerUrlsrouterUrlsconfiguratorUrls 分别处理相应的配置变化,然后对应转化成对象。

    @Override
    public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            // 按要求添加到上面三个 list 中
            ...
        }
        // configurators
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers
        refreshInvoker(invokerUrls);
    }
    
  3. 刷新 invokers:其实就是根据监听变更的 invokerUrls 做一波操作,refreshInvoker(invokerUrls), 根据配置更新 invokers。

     private void refreshInvoker(List<URL> invokerUrls) {
         if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
             && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
             // 如果只有一个 invokerUrl 并且协议是 empty,则清楚所有的 invoker
             this.forbidden = true; // Forbid to access
             this.methodInvokerMap = null; // Set the method invoker map to null
             destroyAllInvokers(); // Close all invokers
         } else {
             // 根据 URL 生成 invoker map,然后方法名对应 invoker 的 map,再销毁无效的 invoker
             ...
             Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
             Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
             ...
             this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
             this.urlInvokerMap = newUrlInvokerMap;
             try {
                 destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
             } catch (Exception e) {
                 logger.warn("destroyUnusedInvokers error. ", e);
             }
         }
     }
    

    ​ 先根据 invokerUrls 数量和协议头是否是 empty,来决定是否禁用所有 invokers,如果不禁用,则将 url 转成 Invoker,得到 <url, Invoker> 的映射关系

    ​ 然后再进行转换,得到 <方法名, Invoker 列表> 映射关系,再将同一个组的 Invoker 进行合并,并将合并结果赋值给 methodInvokerMap,这个 methodInvokerMap 就是上面 doList 中使用的那个 Map。

    ​ 所以是在 refreshInvoker 的时候构造 methodInvokerMap,然后在调用的时候再读 methodInvokerMap,最后再销毁无用的 invoker。

StaticDirectory

StaticDirectory,这个是用在多注册中心的时候,它是一个静态目录,即固定的不会增减的,所有 Invoker 是通过构造器来传入。

​ 可以简单的理解成在单注册中心下我们配置的一条 reference 可能对应有多个 provider,然后生成多个 invoker,我们将它们存入 RegistryDirectory 中进行管理,为了便于调用再对外只暴露出一个 invoker 来封装内部的多 invoker 情况

​ 那多个注册中心就会有多个已经封装好了的 invoker ,这又面临了选择了,于是我们用 StaticDirectory 再来存入这些 invoker 进行管理,也再封装起来对外只暴露出一个 invoker 便于调用。

​ 之所以是静态的是因为多注册中心是写在配置里面的,不像服务可以动态变更。

​ StaticDirectory 的内部逻辑非常的简单,就是一个 list 存储了这些 invokers,然后实现父类的方法也就单纯的返回这个 list 不做任何操作。

@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    return invokers;
}
服务路由

​ 服务路由其实就是路由规则,它规定了服务消费者可以调用哪些服务提供者,Dubbo 一共有三种路由分别是:条件路由 ConditionRouter脚本路由 ScriptRouter标签路由 TagRouter

条件路由

​ 条件路由是两个条件组成的,是这么个格式 [服务消费者匹配条件] => [服务提供者匹配条件],举个例子官网的例子就是 host = 10.20.153.10 => host = 10.20.153.11。该条规则表示 IP 为 10.20.153.10 的服务消费者只可调用 IP 为 10.20.153.11 机器上的服务,不可调用其他机器上的服务。

​ 路由的配置一样是通过 RegistryDirectorynotify 更新和构造的,然后路由的调用在是刷新 invoker 的时候,具体是在调用 toMethodInvokers 的时候会进行服务级别的路由和方法级别的路由。

private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
    ...
    // 服务级别的路由过滤
    List<Invoker<T>> newInvokersList = route(invokersList, null);
    newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
    if (serviceMethods != null && serviceMethods.length > 0) {
        for (String method : serviceMethods) {
            List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
            if (methodInvokers == null || methodInvokers.isEmpty()) {
                methodInvokers = newInvokersList;
            }
            // 方法级别的路由过滤
            newMethodInvokerMap.put(method, route(methodInvokers, method));
        }
    }
    ...
}
Dubbo 的 Cluster

dubbo.rpc.cluster.suport

​ 有了服务目录,并且目录还经过了路由规则的过滤,此时我们能拿到一堆 invokers,然后 Cluster 会把这一堆 invoker 封装成 clusterInovker,给到消费者调用的就只有一个 invoker 了。

​ 然后在这个 clusterInovker 内部还能做各种操作,比如选择一个 invoker ,调用出错了可以换一个等等。

​ 这些细节都被封装了,消费者感受不到这个复杂度,所以 cluster 就是一个中间层,为消费者屏蔽了服务提供者的情况,简化了消费者的使用。并且也更加方便的替换各种集群容错措施。

在这里插入图片描述

​ 每个 Cluster 内部其实返回的都是 XXXClusterInvoker。

AbstractClusterInvoker

​ 这其实是它们的父类,不过 AvailableCluster 内部就是返回 AbstractClusterInvoker,这个主要用在多注册中心的时候。

public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {

        return new AbstractClusterInvoker<T>(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                // 循环哪个能用就返回哪个
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
    }
}
FailoverClusterInvoker

​ FailoverClusterInvoker 实现的是失败自动切换功能,简单的说一个远程调用失败,它就立马换另一个,当然是有重试次数的。

​ doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,会 catch 住异常,然后失败后进行重试。

​ 每次循环会通过负载均衡选择一个 Invoker,然后通过这个 Invoker 进行远程调用,如果失败了会记录下异常,并进行重试。

​ 这个 select 实际上还进行了粘性处理,也就是会记录上一次选择的 invoker ,这样使得每次调用不会一直换invoker,如果上一次没有 invoker,或者上一次的 invoker 下线了则会进行负载均衡选择。

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 获取重试次数,并 + 1。应该不能算第一次
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    // 开始循环
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyinvokers = list(invocation);
            // check again
            checkInvokers(copyinvokers, invocation);
        }
        // 负载均衡调用一个 invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 记录这次的选择,下次重试时进行过滤
        invoked.add(invoker);
        // 记录到上下文中
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 发起调用
            Result result = invoker.invoke(invocation);
            ...
            return result;
        }
        // catch 住异常
        catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(...);
}
FailfastClusterInvoker

​ 这个 cluster 只会进行一次远程调用,如果失败后立即抛出异常,也就是快速失败,它适合于不支持幂等的一些调用。

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 发起调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(...); // 抛错
        }
    }
FailsafeClusterInvoker

​ 这个 cluster 是一种失败安全的 cluster,也就是调用出错仅仅是用日志记录一下,然后返回了一个空结果,适用于写入审计日志等操作。

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        // 打印错误
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        // 返回一个空结果
        return new RpcResult(); // ignore
    }
}
FailbackClusterInvoker

​ 这个 cluster 会在调用失败后,记录下来这次调用,然后返回一个空结果给服务消费者,并且会通过定时任务对失败的调用进行重调。适合执行消息通知等最大努力场景。

​ 代码逻辑是当调用出错的时候就返回空结果,并且加入到 failed 中,并且会有一个定时任务会定时的去调用 failed里面的调用,如果调用成功就从 failed 中移除这个调用。

private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();

@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error(...);
        // 添加到错误 map,后台会起一个定时任务去操作这个 map
        addFailed(invocation, this);
        return new RpcResult(); // ignore
    }
}

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    if (retryFuture == null) {
        synchronized (this) {
            if (retryFuture == null) {
                // 如果没有这个任务,就起一个定时任务
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                    @Override
                    public void run() {
                        // collect retry statistics
                        try {
                            retryFailed();
                        } catch (Throwable t) { // Defensive fault tolerance
                            logger.error("Unexpected error occur at collect statistic", t);
                        }
                    }
                }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
            }
        }
    }
    // 加入到这个 map 中
    failed.put(invocation, router);
}

void retryFailed() {
    if (failed.size() == 0) {
        return;
    }
    // 循环 map 发起调用,成功就 remove 这个调用 invocation
    for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
        failed).entrySet()) {
        Invocation invocation = entry.getKey();
        Invoker<?> invoker = entry.getValue();
        try {
            invoker.invoke(invocation);
            failed.remove(invocation);
        } catch (Throwable e) {
            logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
        }
    }
}
ForkingClusterInvoker

​ 这个 cluster 会在运行时把所有 invoker 都通过线程池进行并发调用,只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。适合用在对实时性要求比较高的读操作

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        ...
        for (final Invoker<T> invoker : selected) {
            // 选择好的 invoker 都扔到线程池中
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch (Throwable e) {
                        int value = count.incrementAndGet();
                        if (value >= selected.size()) {
                            // 等所有调用出错才将错误入队
                            ref.offer(e);
                        }
                    }
                }
            });
        }
        try {
            // 阻塞获取结果
            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
            // 如果是错误就抛出
            if (ret instanceof Throwable) {
                Throwable e = (Throwable) ret;
                throw new RpcException(...);
            }
            // 返回正确结果
            return (Result) ret;
        } catch (InterruptedException e) {
            ...
        }
    } finally {
        // clear attachments which is binding to current thread.
        RpcContext.getContext().clearAttachments();
    }
}
BroadcastClusterInvoker

​ 这个 cluster 会在运行时把所有 invoker 逐个调用,然后在最后判断如果有一个调用抛错的话,就抛出异常。适合通知所有提供者更新缓存或日志等本地资源信息的场景。

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    RpcContext.getContext().setInvokers((List) invokers);
    RpcException exception = null;
    Result result = null;
    // 循环一个一个地调用
    for (Invoker<T> invoker : invokers) {
        try {
            result = invoker.invoke(invocation);
        } catch (RpcException e) {
            exception = e;
            logger.warn(e.getMessage(), e);
        } catch (Throwable e) {
            exception = new RpcException(e.getMessage(), e);
            logger.warn(e.getMessage(), e);
        }
    }
    if (exception != null) {
        // 如果有一个出错就抛错
        throw exception;
    }
    return result;
}
Dubbo 中的负载均衡

Dubbo 也有自己的负载均衡,即 LoadBalance,前面我们提到服务提供者一般都是集群部署,这 cluster 虽然暴露出一个 invoker 给消费者调用,但是真的调用给到它的时候,要根据负载均衡算法决定具体是要调用哪一个服务提供者。

在这里插入图片描述

LoadBalance 接口
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
    /**
     * select one invoker in list.
     *
     * @param invokers   invokers.
     * @param url       refer url
     * @param invocation  invocation.
     * @return selected   invoker.
     */
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
AbstractLoadBalance 抽象类
public abstract class AbstractLoadBalance implements LoadBalance {
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        // ww 小于 1 取 1,否则取 ww 和 weight 中的较小值
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // 检查数组
        if (invokers == null || invokers.isEmpty())
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        // 子类实现
        return doSelect(invokers, url, invocation);
    }

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
    
    /**
     * 计算权重
     * 服务预热:当服务刚启动的时候不能一下次让它负载过高,得让它慢慢热身,再加上负载。
     * 所以这个方法会判断服务运行的时间,来进行服务的降权,这是一个优化手段。
     */
    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        // 获取权重
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
            // 得到启动时间
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                // 计算启动到现在的时间
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                if (uptime > 0 && uptime < warmup) {
                    // 降低权重
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }
}

​ 负载均衡的实现类都继承于这个类,该类实现了 LoadBalance 接口,并封装了一些公共的逻辑,同样还是模板方法。

Dubbo 2.6
Dubbo 3.0
1)RandomLoadBalance 加权随机

加权随机算法是指,以 Invoker 的权重之和为范围产生一个随机数,选取随机数落在其权重所指范围内的 Invoker,意味着谁的权重大,谁匹配到随机数的概率就越高。这是Dubbo 默认采取的负载均衡实现

代码实现步骤:

  1. 遍历 Invokes 数组,统计数组中所有 Invoke 的总权重,并标记是否所有 Invoke 的权重都相等。
  2. 如果所有 Invoke 的权重都相等,则随机选择其中一个。
  3. 否则根据随机数落入的位置判断。

Dubbo 2.6

	 @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers
        int totalWeight = 0; // The sum of weights
        boolean sameWeight = true; // Every invoker has the same weight?
        // 统计 totalWeight 和 更新 sameWeight
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // Sum
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false;
            }
        }
        if (totalWeight > 0 && !sameWeight) {
             // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
			   // 获取随机数
            int offset = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            // 计算随机数掉落在哪个 Invoke 的权重范围
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        // 均等概率随机
        return invokers.get(random.nextInt(length));
    }

Dubbo 3.0

使用 ThreadLocalRandom 替代 Random

敖丙:高并发情况下你还在用Random生成随机数?

​ Random 是通过 CAS 和自旋的方式生成随机数,在多线程模式下同一时刻只能有一个线程通过 CAS 获取到新种子并生成随机数,其他线程只能自旋等待,所以有一定的性能损耗。

​ 因此,虽然Random是线程安全的,但是并不是“高并发”的。

​ ThreadLocalRandom 继承自 Random,根据里氏代换原则,这说明 ThreadLocalRandom 提供了和 Random 相同的随机数生成功能,只是实现算法略有不同。

​ 而在 JDK 1.7 时新增了 ThreadLocalRandom 它的种子保存在各自的线程中,因此不会有自旋等待的过程,所以高并发情况下性能更优秀。

 	 @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();

        // 一些判空处理
        if (!needWeightLoadBalance(invokers,invocation)){
            return invokers.get(ThreadLocalRandom.current().nextInt(length));
        }

        // Every invoker has the same weight?
        boolean sameWeight = true;
        // the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
        int[] weights = new int[length];
        // The sum of weights
        int totalWeight = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // Sum
            totalWeight += weight;
            // save for later use
            weights[i] = totalWeight;
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++) {
                if (offset < weights[i]) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }
2)RoundRobinLoadBalance 加权轮询

​ Dubbo 是参考 Nginx 做的平滑加权轮询。比如 A、B、A、A、B、A…,简单的说就是打乱顺序的轮询。

​ 代码实现比较复杂,就不贴出来了。

​ 假设有 N 台实例 S = {S1, S2, …, Sn},配置权重 W = {W1, W2, …, Wn},有效权重 CW = {CW1, CW2, …, CWn}。每个实例 i 除了存在一个配置权重 Wi 外,还存在一个当前有效权重 CWi,且 CWi 初始化为 Wi;指示变量 currentPos 表示当前选择的实例 ID,初始化为 -1;所有实例的配置权重和为 weightSum;

那么,调度算法可以描述为:
1、初始每个实例 i 的 当前有效权重 CWi 为 配置权重 Wi,并求得配置权重和 weightSum;
2、选出 当前有效权重最大 的实例,将 当前有效权重 CWi 减去所有实例的 权重和 weightSum,且变量 currentPos 指向此位置;
3、将每个实例 i 的 当前有效权重 CWi 都加上 配置权重 Wi;
4、此时变量 currentPos 指向的实例就是需调度的实例;
5、每次调度重复上述步骤 2、3、4;

请求选中前的当前权重currentPos选中的实例选中后的当前权重
1{5, 1, 1}0192.168.10.1:2202{-2, 1, 1}
2{3, 2, 2}0192.168.10.1:2202{-4, 2, 2}
3{1, 3, 3}1192.168.10.2:2202{1, -4, 3}
4{6, -3, 4}0192.168.10.1:2202{-1, -3, 4}
5{4, -2, 5}2192.168.10.3:2202{4, -2, -2}
6{9, -1, -1}0192.168.10.1:2202{2, -1, -1}
7{7, 0, 0}0192.168.10.1:2202{0, 0, 0}
8{5, 1, 1}0192.168.10.1:2202{-2, 1, 1}

模拟:(配置权重: {5, 1, 1},初始权重:{0, 0, 0})

  1. 请求一:
    1. 选中前: 当前权重{0, 0, 0} 加上配置权重 {5, 1, 1},得到 {5, 1, 1};
    2. 得到最大权重值对应的下标 currentPos = 0;
    3. 选中后: 下标 0 的权重减去总权重(sum{5, 1, 1} = 7),得到 {-2, 1, 1};
3)LeastActiveLoadBalance 最少活跃数负载均衡

最少活跃数负载均衡算法是指,选择当前活跃调用数最少的 Invoker 进行调用.如果所有 Invoker 的活跃调用数都相等,则通过权重来判断,这个权重其实就和 RandomLoadBalance 的实现一样了。

​ 活跃的调用数少说明它现在很轻松,而且活跃数都是从 0 加起来的,来一个请求活跃数+1,一个请求处理完成活跃数-1,所以活跃数少也能变相的体现处理的快。

代码实现步骤:

  • 先遍历 Invokers 数组,寻找活跃数最小的 Invoker
  • 如果有多个 Invoker 具有相同的最小活跃数,则记录这些 Invoker 的下标,并累加它们的权重来进行权重选择。

Dubbo 2.6

	 @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers
        int leastActive = -1; // The least active value of all invokers
        int leastCount = 0; // The number of invokers having the same least active value (leastActive)
        int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
        int totalWeight = 0; // The sum of with warmup weights
        int firstWeight = 0; // Initial value, used for comparision
        boolean sameWeight = true; // Every invoker has the same weight value?
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 获取活跃调用数
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // 获取权重
            int afterWarmup = getWeight(invoker, invocation);
            // 遇到更小活跃调用数的 invoker 时进行更新
            if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
                leastActive = active; // Record the current least active value
                leastCount = 1; // Reset leastCount, count again based on current leastCount
                leastIndexs[0] = i; // Reset
                totalWeight = afterWarmup; // Reset
                firstWeight = afterWarmup; // Record the weight the first invoker
                sameWeight = true; // Reset, every invoker has the same weight value?
            } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
                // 统计最小活跃调用的 invoker 的下标
                leastIndexs[leastCount++] = i; // Record index number of this invoker
                totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
                // If every invoker has the same weight?
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        // 只有一个最小活跃调用的 invoker,则选择它
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexs[0]);
        }
        // 若有多个最小活跃调用的 invoker,则根据它们的权重进行选择
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offsetWeight = random.nextInt(totalWeight) + 1;
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

Dubbo 3.0

	 @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // The least active value of all invokers
        int leastActive = -1;
        // The number of invokers having the same least active value (leastActive)
        int leastCount = 0;
        // The index of invokers having the same least active value (leastActive)
        int[] leastIndexes = new int[length];
        // the weight of every invokers
        int[] weights = new int[length];
        // The sum of the warmup weights of all the least active invokers
        int totalWeight = 0;
        // The weight of the first least active invoker
        int firstWeight = 0;
        // Every least active invoker has the same weight value?
        boolean sameWeight = true;


        // Filter out all the least active invokers
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // Get the active number of the invoker
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // Get the weight of the invoker's configuration. The default value is 100.
            int afterWarmup = getWeight(invoker, invocation);
            // save for later use
            weights[i] = afterWarmup;
            // If it is the first invoker or the active number of the invoker is less than the current least active number
            if (leastActive == -1 || active < leastActive) {
                // Reset the active number of the current invoker to the least active number
                leastActive = active;
                // Reset the number of least active invokers
                leastCount = 1;
                // Put the first least active invoker first in leastIndexes
                leastIndexes[0] = i;
                // Reset totalWeight
                totalWeight = afterWarmup;
                // Record the weight the first least active invoker
                firstWeight = afterWarmup;
                // Each invoke has the same weight (only one invoker here)
                sameWeight = true;
                // If current invoker's active value equals with leaseActive, then accumulating.
            } else if (active == leastActive) {
                // Record the index of the least active invoker in leastIndexes order
                leastIndexes[leastCount++] = i;
                // Accumulate the total weight of the least active invoker
                totalWeight += afterWarmup;
                // If every invoker has the same weight?
                if (sameWeight && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // Choose an invoker from all the least active invokers
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexes[0]);
        }
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on 
            // totalWeight.
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }
4) ConsistentHashLoadBalance 一致性哈希

​ 常见的一致性 Hash 算法是 Karger 提出的,就是将 hash值空间设为 [0, 2^32 - 1],并且是个循环的圆环状。

​ 将服务器的 IP 等信息生成一个 hash 值,将这个值投射到圆环上作为一个节点,然后当 key 来查找的时候顺时针查找第一个大于等于这个 key 的 hash 值的节点。

​ 一般而言还会引入虚拟节点,使得数据更加的分散,避免数据倾斜压垮某个节点,来看下官网的一个图。

在这里插入图片描述

​ 整体的实现也不难,就是上面所说的那个逻辑,而圆环这是利用 treeMap 来实现的,通过 tailMap 来查找大于等于的第一个 invoker,如果没找到说明要拿第一个,直接赋值 treeMap 的 firstEntry。

​ 然后 Dubbo 默认搞了 160 个虚拟节点,整体的 hash 是方法级别的,即一个 service 的每个方法有一个 ConsistentHashSelector,并且是根据参数值来进行 hash的,也就是说负载均衡逻辑只受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。

5)ShortestResponseLoadBalance

参考资料:CSDN:万字长文,带你深入浅出五种负载均衡策略

2020年05月15日,Dubbo 2.7.7 release 开始新增这个负载均衡算法:[Github 的提交记录:chickenlj 刘军](


版权声明:本文为qq_45538657原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。