spring cloud config将配置存储在数据库中 和 限流源码解析(Sentinel)

cloud config将配置存储在数据库中

本文讲述spring cloud config如何使用数据库存储配置。

Spring Cloud Config Server最常见是将配置文件放在本地或者远程Git仓库,

  • 放在本地是将将所有的配置文件统一写在Config Server工程目录下,如果需要修改配置,需要重启config server;
  • 放在Git仓库,是将配置统一放在Git仓库,可以利用Git仓库的版本控制。
  • 本文将介绍使用另外一种方式存放配置信息,即将配置存放在Mysql中。
  1. 整个流程:Config Sever暴露Http API接口,
    1. Config Client 通过调用Config Sever的Http API接口来读取配置Config Server的配置信息,
    2. Config Server从数据中读取具体的应用的配置。流程图如下:

61.png

案例实战

在本案例中需要由2个工程,分为config-server和config-client,

其中config-server工程需要连接Mysql数据库,读取配置;

config-client则在启动的时候从config-server工程读取。

  • 本案例Spring Cloud版本为Greenwich.RELEASE,
  • Spring Boot版本为2.1.0.RELEASE。
工程描述
config-server端口8769,从数据库中读取配置
config-client端口8083,从config-server读取配置

搭建config-server工程

创建工程config-server,在工程的pom文件引入config-server的起步依赖,mysql的连接器,jdbc的起步依赖,代码如下:

引入pom

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

在工程的配置文件application.yml下做以下的配置:

yml配置

spring:
  profiles:
     active: jdbc
  application:
     name: config-jdbc-server
     
  datasource: #url username password driver-class
     url: jdbc:mysql://127.0.0.1:3306/config-jdbc?useUnicode=true&characterEncoding=utf8&characterSetResults=utf8&serverTimezone=GMT%2B8
     username: root
     password: 123456
     driver-class-name: com.mysql.jdbc.Driver
  cloud:
     config:
       label: master #主分支
       server:
         jdbc: true #jbc
server:
  port: 8769 #查询数据库的sql语句
spring.cloud.config.server.jdbc.sql: SELECT key1, value1 from config_properties where APPLICATION=? and PROFILE=? and LABEL=?
jdbc:mysql://127.0.0.1:3306/config-jdbc?
useUnicode=true #使用unicode
&characterEncoding=utf8 #编码utf8
&characterSetResults=utf8 #字符集utf8
&serverTimezone=GMT%2B8 #时间zone,京八区
SELECT key1, value1 
from config_properties 
where APPLICATION=? and PROFILE=? and LABEL=?

其中,spring.profiles.active为 spring读取的配置文件名,

  • 从数据库中读取,必须为jdbc。
  • spring.datasource配置了数据库相关的信息,

spring.cloud.config.label读取的配置的分支,

  • 这个需要在数据库中数据对应。

spring.cloud.config.server.jdbc.sql为查询数据库的sql语句,

  • 该语句的字段必须与数据库的表字段一致。

在程序的启动文件ConfigServerApplication加上@EnableConfigServer注解,开启ConfigServer的功能,代码如下:

开启configServer

@SpringBootApplication
@EnableConfigServer
public class ConfigServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ConfigServerApplication.class, args);
	}
}

初始化数据库

由于Config-server需要从数据库中读取,所以读者需要先安装MySQL数据库,安装成功后,创建config-jdbc数据库,数据库编码为utf-8,

  • 然后在config-jdbc数据库下,执行以下的数据库脚本:
CREATE TABLE `config_properties` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `key1` varchar(50) COLLATE utf8_bin NOT NULL,
  `value1` varchar(500) COLLATE utf8_bin DEFAULT NULL,
  `application` varchar(50) COLLATE utf8_bin NOT NULL,
  `profile` varchar(50) COLLATE utf8_bin NOT NULL,
  `label` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin

其中key1字段为配置的key,value1字段为配置的值,application字段对应于应用名,profile对应于环境,label对应于读取的分支,一般为master。

插入数据config-client 的2条数据,包括server.port和foo两个配置,具体数据库脚本如下:

insert into `config_properties` (`id`, `key1`, `value1`, `application`, `profile`, `label`) values('1','server.port','8083','config-client','dev','master');
insert into `config_properties` (`id`, `key1`, `value1`, `application`, `profile`, `label`) values('2','foo','bar-jdbc','config-client','dev','master');

搭建config-client

在 config-client工程的pom文件,引入web和config的起步依赖,代码如下:

导入starter-config

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-config</artifactId>
</dependency>

在程序的启动配置文件 bootstrap.yml做程序的相关配置,一定要是bootstrap.yml,不可以是application.yml,bootstrap.yml的读取优先级更高,配置如下:

配置config-servier的位置

spring:
  application:
    name: config-client
  cloud:
    config:
      uri: http://localhost:8769
      fail-fast: true
  profiles:
    active: dev

其中spring.cloud.config.uri配置的config-server的地址,spring.cloud.config.fail-fast配置的是读取配置失败后,执行快速失败。spring.profiles.active配置的是spring读取配置文件的环境。

在程序的启动文件ConfigClientApplication,写一个RestAPI,读取配置文件的foo配置,返回给浏览器,代码如下:

进行测试读取

@SpringBootApplication
@RestController
public class ConfigClientApplication {

	public static void main(String[] args) {
		SpringApplication.run(ConfigClientApplication.class, args);
	}

	@Value("${foo}")
	String foo;
	@RequestMapping(value = "/foo")
	public String hi(){
		return foo;
	}
}

依次启动2个工程,其中config-client的启动端口为8083,这个是在数据库中的,可见config-client从 config-server中读取了配置。

在浏览器上访问http://localhost:8083/foo,浏览器显示bar-jdbc,这个是在数据库中的,可见config-client从 config-server中读取了配置。

  • 源码下载

https://github.com/forezp/SpringCloudLearning/tree/master/chapter10-5-jdbc

限流源码解析

  • https://www.fangzhipeng.com/springcloud/2019/08/20/ratelimit-guava-sentinel.html

Sentinel 中的流控

Sentinel 是阿里开源的流控、熔断工具,这里不做过多的介绍,感兴趣的读者请自行了解。

在 Sentinel 的流控中,我们可以配置流控规则,

  • 主要是控制 QPS 和线程数,
  • 这里我们不讨论控制线程数,控制线程数的代码不在我们这里的讨论范围内,下面的介绍都是指控制 QPS。

RateLimiterController

RateLimiterController 非常简单,它通过使用 latestPassedTime

  • 属性来记录最后一次通过的时间,然后根据规则中 QPS 的限制,计算当前请求是否可以通过。

举个非常简单的例子:

  • 设置 QPS 为 10,那么每 100 毫秒允许通过一个,通过计算当前时间是否已经过了上一个请求的通过时间 latestPassedTime 之后的 100 毫秒,
  • 来判断是否可以通过。假设才过了 50ms,那么需要当前线程再 sleep 50ms,然后才可以通过。
  • 如果同时有另一个请求呢?那需要 sleep 150ms 才行。

img

public class RateLimiterController implements TrafficShapingController {

    // 排队最大时长,默认 500ms
    private final int maxQueueingTimeMs;
    // QPS 设置的值
    private final double count;
        // 上一次请求通过的时间
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public RateLimiterController(int timeOut, double count) {
        this.maxQueueingTimeMs = timeOut;
        this.count = count;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    // 通常 acquireCount 为 1,这里不用关心参数 prioritized
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        //
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();
        // 计算每 2 个请求之间的间隔,比如 QPS 限制为 10,那么间隔就是 100ms
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        long expectedTime = costTime + latestPassedTime.get();

        // 可以通过,设置 latestPassedTime 然后就返回 true 了
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 不可以通过,需要等待
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            // 等待时长大于最大值,返回 false
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
                // 将 latestPassedTime 往前推
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    // 需要 sleep 的时间
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

}

这个策略还是非常好理解的,简单粗暴,快速失败。

WarmUpController

WarmUpController 用来防止突发流量迅速上升,导致系统负载严重过高,

  • 本来系统在稳定状态下能处理的,但是由于许多资源没有预热,导致这个时候处理不了了。
  • 比如,数据库需要建立连接、需要连接到远程服务等,这就是为什么我们需要预热。

啰嗦一句,这里不仅仅指系统刚刚启动需要预热,对于长时间处于低负载的系统,突发流量也需要重新预热。

Guava 的 SmoothWarmingUp 是用来控制获取令牌的速率的,和这里的控制 QPS 还是有一点区别,但是中心思想是一样的。我们在看完源码以后再讨论它们的区别。

image

为了帮助大家理解源码,我们这边先设定一个场景:QPS 设置为 100,预热时间设置为 10 秒。代码中使用 “【】” 代表根据这个场景计算出来的值。

public class WarmUpController implements TrafficShapingController {

    // 阈值
    protected double count;
    // 3
    private int coldFactor;
    // 转折点的令牌数,和 Guava 的 thresholdPermits 一个意思
    // [500]
    protected int warningToken = 0;
    // 最大的令牌数,和 Guava 的 maxPermits 一个意思
    // [1000]
    private int maxToken;
    // 斜线斜率
    // [1/25000]
    protected double slope;

    // 累积的令牌数,和 Guava 的 storedPermits 一个意思
    protected AtomicLong storedTokens = new AtomicLong(0);
    // 最后更新令牌的时间
    protected AtomicLong lastFilledTime = new AtomicLong(0);

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    // 下面的构造方法,和 Guava 中是差不多的,只不过 thresholdPermits 和 maxPermits 都换了个名字
    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        this.coldFactor = coldFactor;

        // warningToken 和 thresholdPermits 是一样的意思,计算结果其实是一样的
        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // 【warningToken = (10*100)/(3-1) = 500】
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);

        // maxToken 和 maxPermits 是一样的意思,计算结果其实是一样的
        // maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
        // 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

        // 斜率计算
        // slope
        // slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits);
        // 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {

        // Sentinel 的 QPS 统计使用的是滑动窗口

        // 当前时间窗口的 QPS
        long passQps = (long) node.passQps();

        // 这里是上一个时间窗口的 QPS,这里的一个窗口跨度是1分钟
        long previousQps = (long) node.previousPassQps();

        // 同步。设置 storedTokens 和 lastFilledTime 到正确的值
        syncToken(previousQps);

        long restToken = storedTokens.get();
        // 令牌数超过 warningToken,进入梯形区域
        if (restToken >= warningToken) {

            // 这里简单说一句,因为当前的令牌数超过了 warningToken 这个阈值,系统处于需要预热的阶段
            // 通过计算当前获取一个令牌所需时间,计算其倒数即是当前系统的最大 QPS 容量

            long aboveToken = restToken - warningToken;

            // 这里计算警戒 QPS 值,就是当前状态下能达到的最高 QPS。
            // (aboveToken * slope + 1.0 / count) 其实就是在当前状态下获取一个令牌所需要的时间
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            // 如果不会超过,那么通过,否则不通过
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            // count 是最高能达到的 QPS
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }

    protected void syncToken(long passQps) {
        // 下面几行代码,说明在第一次进入新的 1 秒钟的时候,做同步
        // 题外话:Sentinel 默认地,1 秒钟分为 2 个时间窗口,分别 500ms
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            return;
        }

        // 令牌数量的旧值
        long oldValue = storedTokens.get();
        // 计算新的令牌数量,往下看
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {
            // 令牌数量上,减去上一分钟的 QPS,然后设置新值
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

    // 更新令牌数
    private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 当前令牌数小于 warningToken,添加令牌
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            // 当前令牌数量处于梯形阶段,
            // 如果当前通过的 QPS 大于 count/coldFactor,说明系统消耗令牌的速度,大于冷却速度
            //    那么不需要添加令牌,否则需要添加令牌
            if (passQps < (int)count / coldFactor) {
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

}

coolDownTokens 这个方法用来计算新的 token 数量,其实我也没有完全理解作者的设计:

  • 第一、对于令牌的增加,在 Guava 中,使用 warmupPeriodMicros / maxPermits 作为增长率,因为它实现的是 storedPermits 从 0 到 maxPermits 花费的时间为 warmupPeriod。而这里是以每秒 count 个作为增长率,为什么?
  • 第二、else if 分支中的决定我没有理解,为什么用 passQps 和 count / coldFactor 进行对比来决定是否继续添加令牌?
  • 我自己的理解是,count/coldFactor 就是指冷却速度,那么就是说得通的。欢迎大家一起探讨。

最后,我们再简单说说 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的区别。

Guava 在于控制获取令牌的速率,它关心的是,获取 permits 需要多少时间,包括从 storedPermits 中获取,以及获取 freshPermits,以此推进 nextFreeTicketMicros 到未来的某个时间点。

而 Sentinel 在于控制 QPS,它用令牌数来标识当前系统处于什么状态,根据时间推进一直增加令牌,根据通过的 QPS 一直减少令牌。如果 QPS 持续下降,根据推演,可以发现 storedTokens 越来越多,然后越过 warningTokens 这个阈值,之后只有当 QPS 下降到 count/3 以后,令牌才会继续往上增长,一直到 maxTokens。

storedTokens 是以 “count 每秒”的增长率增长的,减少是以 前一分钟的 QPS 来减少的。其实这里我也有个疑问,为什么增加令牌的时候考虑了时间,而减少的时候却不考虑时间因素,提了 issue,似乎没人搭理。

WarmUpRateLimiterController

注意,这个类继承自刚刚介绍的 WarmUpController,它的流控效果定义为排队等待。它的代码其实就是前面介绍的 RateLimiterController 加上 WarmUpController。

public class WarmUpRateLimiterController extends WarmUpController {

    private final int timeoutInMs;
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
        super(count, warmUpPeriodSec, coldFactor);
        this.timeoutInMs = timeOutMs;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        long currentTime = TimeUtil.currentTimeMillis();

        long restToken = storedTokens.get();
        long costTime = 0;
        long expectedTime = 0;

        // 和 RateLimiterController 比较,区别主要就是这块代码

        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;

            // current interval = restToken*slope+1/count
            double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
        } else {
            costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        }
        expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            latestPassedTime.set(currentTime);
            return true;
        } else {
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            if (waitTime > timeoutInMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > timeoutInMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
}

这个代码很简单,就是 RateLimiter 中的代码,然后加入了预热的内容。

在 RateLimiter 中,单个请求的 costTime 是固定的,就是 1/count,比如设置 100 qps,那么 costTime 就是 10ms。

但是这边,加入了 WarmUp 的内容,就是说,通过令牌数量,来判断当前系统的 QPS 应该是多少,如果当前令牌数超过 warningTokens,那么系统的 QPS 容量已经低于我们预设的 QPS,相应的,costTime 就会延长。