sentinel通过责任链模式,让每个slot来实现一种功能来实现流量控制、熔断降级等功能。其中,最重要的一个Slot非StatisticSlot莫属,它通过统计单位时间的调用数、成功数、失败数等,为流量控制、熔断降级等提供数据支撑,而StatisticSlot的底层是基于滑动窗口实现实时指标统计的,下面介绍一下StatisticSlot的工作过程
一、StatisticSlot的入口
sentinel将多个slot串联起来,每个slot在处理完成后,将数据传递给下一个slot,这些slot都是通过函数entry作为处理数据的入口,StatisticSlot的入口如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
// 触发下一个Slot的entry方法
fireEntry(context, resourceWrapper, node, count, args);
// 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
// 统计信息
node.increaseThreadNum();
node.addPassRequest();
// 省略部分代码
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
// 省略部分代码
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
// 省略部分代码
throw e;
}
}
StatisticSlot的功能主要有:
1、通过现有的统计数据进行规则校验,如果校验通过则可以对监控的接口进行访问。
2、校验通过则进行实时统计数据的更新。
3、如果被block或出现了异常了,则重新更新node中block的指标或异常指标。
通过代码可以发现,StatisticSlot主要是通过DefaultNode node来进行实时统计数据的更新,
下面来看一下DefaultNode的代码:
public class DefaultNode extends StatisticNode {
private ResourceWrapper id;
private ClusterNode clusterNode;
// 省略部分代码
//每通过一次,增加一次passRequest
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
// 省略部分代码
}
通过代码可以看出,DefaultNode主要通过继承StatisticNode来实现统计功能。除此之外,DefaultNode还有一个成员变量ClusterNode,ClusterNode主要是记录所有的context中实时指标的总和。DefaultNode与ClusterNode的关系如下:
DefaultNode:保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode
ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中
二、StatisticNode与ArrayMetric
从上面的分析中,我们知道实时指标的统计主要在StatisticNode中实现。下面介绍一下StatisticNode
private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);
@Override
public void addPassRequest() {
rollingCounterInSecond.addPass();
rollingCounterInMinute.addPass();
}
两个变量rollingCounterInSecond和rollingCounterInMinute,分别统计一秒钟的实时指标和一分钟的实时指标,他们对应的类是ArrayMetric。
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
@Override
public void addSuccess(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addSuccess(count);
}
}
}
在ArrayMetric中,定义了类似于数组的变量data,它里面的每一个元素类似于一个桶,每个桶对应一个时间段,存放这个时间段的统计数据。以BucketLeapArray 为例
public class BucketLeapArray extends LeapArray<MetricBucket> {
public BucketLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
}
BucketLeapArray 主要继承LeapArray<MetricBucket>,LeapArray的代码如下:
public abstract class LeapArray<T> {
//单个窗口的长度,即每个窗口统计的时间段的大小
protected int windowLengthInMs;
//窗口的个数,即array的大小
protected int sampleCount;
//整个数组统计的时间段的大小,intervalInMs=windowLengthInMs*sampleCount
protected int intervalInMs;
//一个数组,每个元素用来记录对应时间的数据
protected final AtomicReferenceArray<WindowWrap<T>> array;
}
由此可见,数组array中的WindowWrap具有统计数据指标的能力。
在ArrayMetric的函数addSuccess中,增加统计数据主要是通过MetricBucket中的addSuccess来实现,那我们就看一下MetricBucket。
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
public class MetricBucket {
private final LongAdder[] counters;
public void addSuccess(int n) {
add(MetricEvent.SUCCESS, n);
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
}
在MetricBucket中,定义了一个数组counters,数组中的元素分别用来记录单位时间内的pass、block、exception、success、rt等。
由此可见,StatisticNode主要是通过ArrayMetric来确定好滑动窗口的大小( windowLengthInMs)
和个数(sampleCount),ArrayMetric中的数组data的size就是sampleCount,它的每一个元素就是MetricBucket ,即存放数据的桶。
三、滑动窗口
在使用滑动窗口统计数据时,需要知道当前数据应该落到哪个桶里面,下面介绍一下sentinel中滑动窗口的数据存放原理。先看一下滑动窗口数组的定义
public abstract class LeapArray<T> {
// 时间窗口的长度
protected int windowLengthInMs;
// 采样窗口的个数
protected int sampleCount;
// 以毫秒为单位的时间间隔
protected int intervalInMs;
// 采样的时间窗口数组
protected AtomicReferenceArray<WindowWrap<T>> array;
/**
* LeapArray对象
* @param windowLength 时间窗口的长度,单位:毫秒
* @param intervalInSec 统计的间隔,单位:秒
*/
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
在LeapArray中,定义数组array,默认窗口大小为500ms,窗口个数为2(intervalInSec 默认为1000,sampleCount默认为2)
现在继续回到 ArrayMetric.addPass()
方法中
@Override
public void addPass() {
WindowWrap<Window> wrap = data.currentWindow();
wrap.value().addPass();
}
主要是通过currentWindow()来获取当前窗口
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
@Override
public WindowWrap<Window> currentWindow(long time) {
// time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
long timeId = time / windowLength;
// Calculate current index.
// idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引
int idx = (int)(timeId % array.length());
// Cut the time to current window start.
long currentWindowStart = time - time % windowLength;
while (true) {
// 从采样数组中根据索引获取缓存的时间窗口
WindowWrap<Window> old = array.get(idx);
// array数组长度不宜过大,否则old很多情况下都命中不了,就会创建很多个WindowWrap对象
if (old == null) {
// 如果没有获取到,则创建一个新的
WindowWrap<Window> window = new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
// 通过CAS将新窗口设置到数组中去
if (array.compareAndSet(idx, null, window)) {
// 如果能设置成功,则将该窗口返回
return window;
} else {
// 否则当前线程让出时间片,等待
Thread.yield();
}
// 如果当前窗口的开始时间与old的开始时间相等,则直接返回old窗口
} else if (currentWindowStart == old.windowStart()) {
return old;
// 如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口
// 并将time设置为新的时间窗口的开始时间,此时窗口向前滑动
} else if (currentWindowStart > old.windowStart()) {
if (addLock.tryLock()) {
try {
// if (old is deprecated) then [LOCK] resetTo currentTime.
return resetWindowTo(old, currentWindowStart);
} finally {
addLock.unlock();
}
} else {
Thread.yield();
}
// 这个条件不可能存在
} else if (currentWindowStart < old.windowStart()) {
// Cannot go through here.
return new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
}
}
}
这段代码的逻辑如下:
1、获取当前时间,用当前时间对窗口大小windowLength求差,得到时间的索引timeId,再用timeId对数组的长度取模,得到数组的下标idx
2、根据数组下标idx得到数组中的元素 WindowWrap<Window> old 。
2.1、如果old为空,说明没有初始化,整个时候就需要创建一个新的窗口,再将窗口放入到数组中。
2.2、如果old不为空,且当时时间段的开始时间和old的开始时间相同,则说明当前时间对应的窗口就是old,直接返回
2.3、如果当前时间段的开始时间大于old的开始时间,说明old是属于上一轮数组的时间窗口,则需要执行函数resetWindowTo(old, currentWindowStart)。函数resetWindowTo就是将old的时间设置为当前时间窗口的时间,并且清理old中之前统计的数据,即将old清空,时间重置为当前时间窗口的开始时间。
这个滑动窗口的难点在于,时间timeId会不断增加,但是窗口数组data是固定大小(假设大小为2,数组下标为0、1)。所以刚开始时间窗口的索引idx=0,会落地array[0]中,然后时间增加,落到array[1]中,然后时间继续增加,idx=0,但是不能让这一次的数据落到上一次的时间窗口中,需要需要执行resetWindowTo,将上次统计到的数据情况。
另外,sentinel会运行一个线程,定时将滑动窗口中统计的数据写入到本地文件中,所以不用担心执行resetWindowTo会丢失掉之前统计的数据。