sentinel基于滑动窗口实现实时指标统计原理

sentinel通过责任链模式,让每个slot来实现一种功能来实现流量控制、熔断降级等功能。其中,最重要的一个Slot非StatisticSlot莫属,它通过统计单位时间的调用数、成功数、失败数等,为流量控制、熔断降级等提供数据支撑,而StatisticSlot的底层是基于滑动窗口实现实时指标统计的,下面介绍一下StatisticSlot的工作过程

一、StatisticSlot的入口

    sentinel将多个slot串联起来,每个slot在处理完成后,将数据传递给下一个slot,这些slot都是通过函数entry作为处理数据的入口,StatisticSlot的入口如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    try {
        // 触发下一个Slot的entry方法
        fireEntry(context, resourceWrapper, node, count, args);
        // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
        // 统计信息
        node.increaseThreadNum();
        node.addPassRequest();
        // 省略部分代码
    } catch (BlockException e) {
        context.getCurEntry().setError(e);
        // Add block count.
        node.increaseBlockedQps();
        // 省略部分代码
        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        // Should not happen
        node.increaseExceptionQps();
        // 省略部分代码
        throw e;
    }
}

StatisticSlot的功能主要有:

1、通过现有的统计数据进行规则校验,如果校验通过则可以对监控的接口进行访问。

2、校验通过则进行实时统计数据的更新。

3、如果被block或出现了异常了,则重新更新node中block的指标或异常指标。

通过代码可以发现,StatisticSlot主要是通过DefaultNode node来进行实时统计数据的更新,

下面来看一下DefaultNode的代码:

public class DefaultNode extends StatisticNode {


    private ResourceWrapper id;

    private ClusterNode clusterNode;

     // 省略部分代码

    //每通过一次,增加一次passRequest
    public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
    }
    // 省略部分代码
    

}

通过代码可以看出,DefaultNode主要通过继承StatisticNode来实现统计功能。除此之外,DefaultNode还有一个成员变量ClusterNode,ClusterNode主要是记录所有的context中实时指标的总和。DefaultNode与ClusterNode的关系如下:

        DefaultNode:保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode

        ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中

二、StatisticNode与ArrayMetric

      从上面的分析中,我们知道实时指标的统计主要在StatisticNode中实现。下面介绍一下StatisticNode

private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL);

private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);

@Override
public void addPassRequest() {
    rollingCounterInSecond.addPass();
    rollingCounterInMinute.addPass();
}

两个变量rollingCounterInSecond和rollingCounterInMinute,分别统计一秒钟的实时指标和一分钟的实时指标,他们对应的类是ArrayMetric。

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    @Override
    public void addSuccess(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addSuccess(count);
    }
}
}

在ArrayMetric中,定义了类似于数组的变量data,它里面的每一个元素类似于一个桶,每个桶对应一个时间段,存放这个时间段的统计数据。以BucketLeapArray 为例

public class BucketLeapArray extends LeapArray<MetricBucket> {

    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }
}

BucketLeapArray 主要继承LeapArray<MetricBucket>,LeapArray的代码如下:

public abstract class LeapArray<T> {

   //单个窗口的长度,即每个窗口统计的时间段的大小
   protected int windowLengthInMs;
    //窗口的个数,即array的大小
    protected int sampleCount;
   //整个数组统计的时间段的大小,intervalInMs=windowLengthInMs*sampleCount
    protected int intervalInMs;

    //一个数组,每个元素用来记录对应时间的数据
    protected final AtomicReferenceArray<WindowWrap<T>> array;
}

由此可见,数组array中的WindowWrap具有统计数据指标的能力。

在ArrayMetric的函数addSuccess中,增加统计数据主要是通过MetricBucket中的addSuccess来实现,那我们就看一下MetricBucket。

public enum MetricEvent {


    PASS,

    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,
    OCCUPIED_PASS
}
public class MetricBucket {

    private final LongAdder[] counters;

    public void addSuccess(int n) {
        add(MetricEvent.SUCCESS, n);
    }
    
    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }
    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }
}

在MetricBucket中,定义了一个数组counters,数组中的元素分别用来记录单位时间内的pass、block、exception、success、rt等。

由此可见,StatisticNode主要是通过ArrayMetric来确定好滑动窗口的大小( windowLengthInMs)
和个数(sampleCount),ArrayMetric中的数组data的size就是sampleCount,它的每一个元素就是MetricBucket ,即存放数据的桶。

三、滑动窗口

      在使用滑动窗口统计数据时,需要知道当前数据应该落到哪个桶里面,下面介绍一下sentinel中滑动窗口的数据存放原理。先看一下滑动窗口数组的定义

public abstract class LeapArray<T> {

    // 时间窗口的长度
    protected int windowLengthInMs;
    // 采样窗口的个数
    protected int sampleCount;
    // 以毫秒为单位的时间间隔
    protected int intervalInMs;

    // 采样的时间窗口数组
    protected AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * LeapArray对象
     * @param windowLength 时间窗口的长度,单位:毫秒
     * @param intervalInSec 统计的间隔,单位:秒
     */
    public LeapArray(int sampleCount, int intervalInMs) {
       
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<>(sampleCount);
    }
}

 在LeapArray中,定义数组array,默认窗口大小为500ms,窗口个数为2(intervalInSec 默认为1000,sampleCount默认为2)

现在继续回到 ArrayMetric.addPass() 方法中

@Override
public void addPass() {
    WindowWrap<Window> wrap = data.currentWindow();
    wrap.value().addPass();
}

      主要是通过currentWindow()来获取当前窗口

 public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
 }

@Override
public WindowWrap<Window> currentWindow(long time) {
    // time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
    long timeId = time / windowLength;
    // Calculate current index.
    // idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引
    int idx = (int)(timeId % array.length());

    // Cut the time to current window start.
    long currentWindowStart = time - time % windowLength;

    while (true) {
        // 从采样数组中根据索引获取缓存的时间窗口
        WindowWrap<Window> old = array.get(idx);
        // array数组长度不宜过大,否则old很多情况下都命中不了,就会创建很多个WindowWrap对象
        if (old == null) {
            // 如果没有获取到,则创建一个新的
            WindowWrap<Window> window = new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
            // 通过CAS将新窗口设置到数组中去
            if (array.compareAndSet(idx, null, window)) {
                // 如果能设置成功,则将该窗口返回
                return window;
            } else {
                // 否则当前线程让出时间片,等待
                Thread.yield();
            }
        // 如果当前窗口的开始时间与old的开始时间相等,则直接返回old窗口
        } else if (currentWindowStart == old.windowStart()) {
            return old;
        // 如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口
        // 并将time设置为新的时间窗口的开始时间,此时窗口向前滑动
        } else if (currentWindowStart > old.windowStart()) {
            if (addLock.tryLock()) {
                try {
                    // if (old is deprecated) then [LOCK] resetTo currentTime.
                    return resetWindowTo(old, currentWindowStart);
                } finally {
                    addLock.unlock();
                }
            } else {
                Thread.yield();
            }
        // 这个条件不可能存在
        } else if (currentWindowStart < old.windowStart()) {
            // Cannot go through here.
            return new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
        }
    }
}

这段代码的逻辑如下:

   1、获取当前时间,用当前时间对窗口大小windowLength求差,得到时间的索引timeId,再用timeId对数组的长度取模,得到数组的下标idx

   2、根据数组下标idx得到数组中的元素 WindowWrap<Window> old 。

        2.1、如果old为空,说明没有初始化,整个时候就需要创建一个新的窗口,再将窗口放入到数组中。

       2.2、如果old不为空,且当时时间段的开始时间和old的开始时间相同,则说明当前时间对应的窗口就是old,直接返回

     2.3、如果当前时间段的开始时间大于old的开始时间,说明old是属于上一轮数组的时间窗口,则需要执行函数resetWindowTo(old, currentWindowStart)。函数resetWindowTo就是将old的时间设置为当前时间窗口的时间,并且清理old中之前统计的数据,即将old清空,时间重置为当前时间窗口的开始时间。

这个滑动窗口的难点在于,时间timeId会不断增加,但是窗口数组data是固定大小(假设大小为2,数组下标为0、1)。所以刚开始时间窗口的索引idx=0,会落地array[0]中,然后时间增加,落到array[1]中,然后时间继续增加,idx=0,但是不能让这一次的数据落到上一次的时间窗口中,需要需要执行resetWindowTo,将上次统计到的数据情况。

另外,sentinel会运行一个线程,定时将滑动窗口中统计的数据写入到本地文件中,所以不用担心执行resetWindowTo会丢失掉之前统计的数据。


版权声明:本文为bao2901203013原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。