sentienl控制台、客户端持久化到Apollo

sentinel 学习笔记

前段时间大致用了一下sentinel。但是阿里云的sentinel开源并没有实现持久化,针对这个问题我采用了apollo去实现sentinel的持久化。下面就记录一下实现sentinel持久化的过程。

Sentinel 是什么?

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。

github项目地址

如何使用sentinel?

对于初学者来说,使用sentinel只需要限流功能就可以了,所以在本篇文章中,仅针对SpringCloud项目的限流功能进行阐述。

开启sentinel控制台

可以先去官网下载源代码,然后在本地打开sentinel控制台,账号密码都是sentinel,端口是8001(都可以在application配置里面改)

在这里插入图片描述

打开后如图所示,因为没有任何服务连接sentinel控制台,所以现在是一片空白。

开启一个项目连接sentinel控制台

开启一个SpringCloud项目,写一个最简单的controller

在这里插入图片描述

添加sentinel依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
            <version>0.9.0.RELEASE</version>
        </dependency>

添加application配置

# port settings
server:
  port: 8081
spring:
  application:
    name: sentinel-web-demo
  cloud:
    sentinel:
      transport:
        dashboard: localhost:8001 # sentinel客户端服务的地址
      eager: true

这时候再登录sentinel就可以看到名为sentinel-web-demo的服务注册上去了

在这里插入图片描述

sentinel实现限流

在这里插入图片描述

如图所示,在流控规则下新增一个流控规则,资源名中填写请求路径,然后设置阈值就可以了。

在这里插入图片描述

在这里插入图片描述

成功时正常显示,失败时则会输出提示信息。

是的,完成了这步我们就实现了最简单的sentinel控流。

sentinel的持久化

很快我们就能发现存在这样的问题,一旦服务器或者客户端重启了,sentinel的控流规则就失效了。这对于一个上线的项目来说无疑是致命的,万一机器出现故障,那么我们大量的sentinel控流规则都会消失。这是因为开源的sentinel控流规则是存储在内存中的,我们需要通过一些方法来实现sentinel的持久化,在这里就只简单说一下使用apollo实现sentinel持久化。

感兴趣的同学可以看一下官方的文档

推送模式说明优点缺点
原始模式API 将规则推送至客户端并直接更新到内存中,扩展写数据源(WritableDataSource简单,无任何依赖不保证一致性;规则保存在内存中,重启即消失。严重不建议用于生产环境
Pull 模式扩展写数据源(WritableDataSource), 客户端主动向某个规则管理中心定期轮询拉取规则,这个规则中心可以是 RDBMS、文件 等简单,无任何依赖;规则持久化不保证一致性;实时性不保证,拉取过于频繁也可能会有性能问题。
Push 模式扩展读数据源(ReadableDataSource),规则中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用 Nacos、Zookeeper 等配置中心。这种方式有更好的实时性和一致性保证。生产环境下一般采用 push 模式的数据源。规则持久化;一致性;快速引入第三方依赖

sentinel 控制台代码改造

pom.xml文件先把Apollo包的test作用域去掉,顺便log4j更新成 2.15(防止恶意代码注入)

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
        </dependency>

		<dependency>
            <groupId>com.ctrip.framework.apollo</groupId>
            <artifactId>apollo-openapi</artifactId>
            <version>1.2.0</version>
<!--            <scope>test</scope>-->
        </dependency>

写一个Apollo的config,Converter<T,S>指的是将S转换为T类型,portalUrl是Apollo的地址,token是Apollo生成的token

public class ApolloConfig {

    @Value("${apollo.meta}")
    private String portalUrl;
    @Value("${apollo.token}")
    private String token;

    /**
     * 流控规则编码
     *
     * @return
     */
    @Bean
    public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    /**
     * 流控规则解码
     *
     * @return
     */
    @Bean
    public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
        return s -> JSON.parseArray(s, FlowRuleEntity.class);
    }

    /**
     * 降级规则编码
     *
     * @return
     */
    @Bean
    public Converter<List<DegradeRuleEntity>, String> degradeRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    /**
     * 降级规则解码
     *
     * @return
     */
    @Bean
    public Converter<String, List<DegradeRuleEntity>> degradeRuleEntityDecoder() {
        return s -> JSON.parseArray(s, DegradeRuleEntity.class);
    }

    /**
     * 授权规则编码
     *
     * @return
     */
    @Bean
    public Converter<List<AuthorityRuleEntity>, String> authorityRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    /**
     * 授权规则解码
     *
     * @return
     */
    @Bean
    public Converter<String, List<AuthorityRuleEntity>> authorityRuleEntityDecoder() {
        return s -> JSON.parseArray(s, AuthorityRuleEntity.class);
    }

    /**
     * 系统规则编码
     *
     * @return
     */
    @Bean
    public Converter<List<SystemRuleEntity>, String> systemRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    /**
     * 系统规则解码
     *
     * @return
     */
    @Bean
    public Converter<String, List<SystemRuleEntity>> systemRuleEntityDecoder() {
        return s -> JSON.parseArray(s, SystemRuleEntity.class);
    }

    /**
     * 热点规则编码
     *
     * @return
     */
    @Bean
    public Converter<List<ParamFlowRuleEntity>, String> paramFlowRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    /**
     * 热点规则解码
     *
     * @return
     */
    @Bean
    public Converter<String, List<ParamFlowRuleEntity>> paramFlowRuleEntityDecoder() {
        return s -> JSON.parseArray(s, ParamFlowRuleEntity.class);
    }

    /**
     * 集群流控规则编码
     *
     * @return
     */
    @Bean
    public Converter<List<ClusterAppAssignMap>, String> clusterGroupEntityEncoder() {
        return JSON::toJSONString;
    }

    /**
     * 集群流控规则解码
     *
     * @return
     */
    @Bean
    public Converter<String, List<ClusterAppAssignMap>> clusterGroupEntityDecoder() {
        return s -> JSON.parseArray(s, ClusterAppAssignMap.class);
    }

    @Bean
    public ApolloOpenApiClient apolloOpenApiClient() {
        ApolloOpenApiClient client = ApolloOpenApiClient.newBuilder()
                .withPortalUrl(portalUrl)
                .withToken(token)
                .build();
        return client;

    }
}

建一个Apollo的Util类

public final class ApolloConfigUtil {

    /**
     * 流控规则id
     */
    public static final String FLOW_DATA_ID_POSTFIX = "sentinel-flow-rules";
    /**
     * 降级规则id
     */
    public static final String DEGRADE_DATA_ID_POSTFIX = "sentinel-degrade-rules";
    /**
     * 热点规则id
     */
    public static final String PARAM_FLOW_DATA_ID_POSTFIX = "sentinel-param-flow-rules";
    /**
     * 系统规则id
     */
    public static final String SYSTEM_DATA_ID_POSTFIX = "sentinel-system-rules";
    /**
     * 授权规则id
     */
    public static final String AUTHORITY_DATA_ID_POSTFIX = "sentinel-authority-rules";
    /**
     * 集群流控id
     */
    public static final String CLUSTER_GROUP_DATA_ID_POSTFIX = "sentinel-cluster-group-rules";
    /**
     * 规则存储nameSpace
     */
    public static final String NAMESPACE_NAME = "application";

    private ApolloConfigUtil() {
    }

    public static String getClusterGroupDataId(String appName) {
        return String.format("%s_%s",appName,CLUSTER_GROUP_DATA_ID_POSTFIX);
    }

    public static String getFlowDataId(String appName) {
        return String.format("%s_%s",appName,FLOW_DATA_ID_POSTFIX);
    }

    public static String getDegradeDataId(String appName) {
        return String.format("%s_%s",appName,DEGRADE_DATA_ID_POSTFIX);
    }

    public static String getParamFlowDataId(String appName) {
        return String.format("%s_%s",appName,PARAM_FLOW_DATA_ID_POSTFIX);
    }

    public static String getSystemDataId(String appName) {
        return String.format("%s_%s",appName,SYSTEM_DATA_ID_POSTFIX);
    }

    public static String getAuthorityDataId(String appName) {
        return String.format("%s_%s",appName,AUTHORITY_DATA_ID_POSTFIX);
    }

    public static String getNamespaceName(String appName) {
        return String.format("%s_%s",appName,NAMESPACE_NAME);
    }
}

接下来写一个控流规则的Provider和Publisher

@Component("flowRuleApolloProvider")
public class FlowRuleApolloProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    @Autowired
    private ApolloOpenApiClient apolloOpenApiClient;
    @Autowired
    private Converter<String, List<FlowRuleEntity>> converter;
    @Value("${app.id}")
    private String appId;
    @Value("${spring.profiles.active}")
    private String env;
    @Value("${apollo.clusterName}")
    private String clusterName;
    @Value("${apollo.namespaceName}")
    private String namespaceName;
    @Override
    public List<FlowRuleEntity> getRules(String appName){
        String flowDataId = ApolloConfigUtil.getFlowDataId(appName);
        OpenNamespaceDTO openNamespaceDTO = apolloOpenApiClient.getNamespace(appId, env, clusterName, namespaceName);
        String rules = openNamespaceDTO
                .getItems()
                .stream()
                .filter(p -> p.getKey().equals(flowDataId))
                .map(OpenItemDTO::getValue)
                .findFirst()
                .orElse("");
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        return converter.convert(rules);
    }
}
@Component("flowRuleApolloPublisher")
public class FlowRuleApolloPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {

    @Autowired
    private ApolloOpenApiClient apolloOpenApiClient;
    @Autowired
    private Converter<List<FlowRuleEntity>, String> converter;
    @Value("${app.id}")
    private String appId;
    @Value("${spring.profiles.active}")
    private String env;
    @Value("${apollo.user}")
    private String user;
    @Value("${apollo.clusterName}")
    private String clusterName;
    @Value("${apollo.namespaceName}")
    private String namespaceName;

    @Override
    public void publish(String app, List<FlowRuleEntity> rules){
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        filterField(rules);
        // Increase the configuration
        String flowDataId = ApolloConfigUtil.getFlowDataId(app);
        OpenItemDTO openItemDTO = new OpenItemDTO();
        openItemDTO.setKey(flowDataId);
        openItemDTO.setValue(converter.convert(rules));
        openItemDTO.setComment("Program auto-join");
        openItemDTO.setDataChangeCreatedBy(user);
        apolloOpenApiClient.createOrUpdateItem(appId, env, clusterName, namespaceName, openItemDTO);
        // Release configuration
        NamespaceReleaseDTO namespaceReleaseDTO = new NamespaceReleaseDTO();
        namespaceReleaseDTO.setEmergencyPublish(true);
        namespaceReleaseDTO.setReleaseComment("Modify or add configurations");
        namespaceReleaseDTO.setReleasedBy(user);
        namespaceReleaseDTO.setReleaseTitle("Modify or add configurations");
        apolloOpenApiClient.publishNamespace(appId, env, clusterName, namespaceName, namespaceReleaseDTO);
//        System.out.println("publish");
    }

    /**
     * 过滤不必要的字段
     *
     * @param rules
     */
    private void filterField(List<FlowRuleEntity> rules) {
        // 对不必要的信息进行过滤
        for (FlowRuleEntity rule : rules) {
            rule.setGmtCreate(null);
            rule.setGmtModified(null);
            // rule.setIp(null);git
            // rule.setPort(null);
        }
    }
}

然后就是改造Controller代码,在这里我就只针对FlowControllerV1做修改。

在这之前我们先把刚刚写的provider和publisher注入进Controller

@Autowired
@Qualifier("flowRuleApolloProvider")
private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;

@Autowired
@Qualifier("flowRuleApolloPublisher")
private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;

首先是列出所有规则的接口,我们可以直接从apollo中获取规则,这里用的是ruleProvider.getRules(app);

@GetMapping("/rules")
    @AuthAction(PrivilegeType.READ_RULE)
    public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
                                                             @RequestParam String ip,
                                                             @RequestParam Integer port) {

        if (StringUtil.isEmpty(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }
        if (StringUtil.isEmpty(ip)) {
            return Result.ofFail(-1, "ip can't be null or empty");
        }
        if (port == null) {
            return Result.ofFail(-1, "port can't be null");
        }
        try {
            List<FlowRuleEntity> rules = ruleProvider.getRules(app);
            repository.saveAll(rules);
            if (rules != null && !rules.isEmpty()) {
                for (FlowRuleEntity entity : rules) {
                    entity.setApp(app);
                    if (entity.getClusterConfig() != null && entity.getClusterConfig().getFlowId() != null) {
                        entity.setId(entity.getClusterConfig().getFlowId());
                    }
                }
            }
            return Result.ofSuccess(rules);
        } catch (Throwable throwable) {
            logger.error("Error when querying flow rules", throwable);
            return Result.ofThrowable(-1, throwable);
        }
    }

然后是修改publishRules,因为我们在每次修改完规则后都会调用这个函数

    private CompletableFuture<Void> publishRules(String app, String ip, Integer port) {
        List<FlowRuleEntity> rules = repository.findAllByMachine(MachineInfo.of(app, ip, port));
        rulePublisher.publish(app, rules);
        return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules);
    }

还有一个地方值得注意,如果我们重启后,再新增规则的时候,id会从1开始计算,那这样就有覆盖以前规则的风险。我才用的方法是:新增加一个cnt变量,每次的时候自增cnt变量,从apollo获取cnt时,取的是id的最大值。

entity.setId(repository.nextID());

为InMemoryRuleRepositoryAdapter<>类新增cnt变量,增加一个NextID()函数,用作cnt的自增

private static long cnt = 0;
public Long nextID(){
    return ++cnt ;
}

在获取Apollo规则的时候,将cnt换为最大值。

    @Override
    public List<T> saveAll(List<T> rules) {
        // TODO: check here.
        allRules.clear();
        machineRules.clear();
        appRules.clear();

        if (rules == null) {
            return null;
        }
        List<T> savedRules = new ArrayList<>(rules.size());
        for (T rule : rules) {
            savedRules.add(save(rule));
            if(rule instanceof FlowRuleEntity) {
                cnt=cnt<rule.getId()?rule.getId():cnt;
            }
        }
        return savedRules;
    }

回到FlowControllerV1中,将接口“rule”的代码entity.setID改为entity.setId(repository.nextID())

这样我们就实现了sentinel控制台规则的持久化啦!

sentinel客户端持久化

虽然我们现在能够重启控制台,仍然看到的我们配置过的信息,但是如果我们重启了客户端之后,就会发现控流规则不管用了。这是因为控流规则实际上还是保存在客户端的内存中的,我们实现的控制台规则持久化,只是将规则保存到apollo,开启项目时往apollo获取配置,但是这个规则并没有放到客户端中。

所以我们的客户端也要连接apollo,在开启项目时获取apollo上的规则。

引入Apollo包

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-apollo</artifactId>
    <version>1.5.2</version>
</dependency>

重写sentinel的InitFunc

利用SPI技术植入实现层代码,这点Sentinel已经帮我们做好了.只要实现com.alibaba.csp.sentinel.init.InitFunc接口即可自动调用里面的代码.

先在resources\META-INF\services下建一个名字为com.alibaba.csp.sentinel.init.InitFunc的文件,内容是:

com.example.sentineldemo.init.DataSourceInitFunc

在这里插入图片描述

DataSourceInitFunc的实现内容为

package com.example.sentineldemo.init;

import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.apollo.ApolloDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

import java.util.List;
import java.util.Properties;

/**
 * @author ZhzGod
 * @date 2021/11/10 18:25
 * @introduction 参考博客:https://blog.csdn.net/caodegao/article/details/100009618
 */
public class DataSourceInitFunc implements InitFunc {
    // namespaceName 对应Apollo的命名空间名称
    // ruleKey 对应规则存储的Key
    // defaultRules 对应连接不上Apollo时的默认规则
    private static final String namespaceName = "application";
    private static final String defaultRuleValue = "[]";
    private static final String PARAM_FLOW_DATA_ID_POSTFIX = "sentinel-flow-rules";
    public static final String APP = "sentinel-web-demo";

    @Override
    public void init(){
        flowRuleDataSource();
    }
    
    private void flowRuleDataSource(){
        System.out.println(String.format("%s_%s", APP, FLOW_DATA_ID_POSTFIX));
        ReadableDataSource<String, List<ParamFlowRule>> flowRuleDataSource = new ApolloDataSource<>(namespaceName,
                String.format("%s_%s",APP,FLOW_DATA_ID_POSTFIX),
                defaultRuleValue,
                source -> JSON.parseObject(source,new TypeReference<List<ParamFlowRule>>(){}
                )
        );
        ParamFlowRuleManager.register2Property(flowRuleDataSource.getProperty());
    }
}

application配置

# port settings
server:
  port: 8081
spring:
  application:
    name: sentinel-web-demo
  cloud:
    sentinel:
      datasource:
        ds:
          apollo:
            namespace-name: application  # 保存规则的 apollo 应用的公共 namespace, 要与 sentinel 控制台启动参数一致
            rule-type: flow   # 指定该数据源是限流规则
            flow-rules-key: ${spring.application.name}-${spring.cloud.sentinel.datasource.ds.apollo.rule-type} # 指定该规则在 apollo 应用中 key 名称
      transport:
        dashboard: localhost:8080
      eager: true
  profiles:
    active: dev

apollo:
  bootstrap:
    enabled: true # 开启 apollo
  meta: http://localhost:9080 # 这个地址是erueka的地址,而不是apollo客户端的地址
  env: dev

app:
  id: sentinel

遇到的大坑

别说了这个apollo.meta搞死我了!我一直报错

WARN 22680 --- [ngPollService-1] c.c.f.a.i.RemoteConfigLongPollService    : Long polling failed, will retry in 32 seconds. appId: sentinel, cluster: default, namespaces: application, long polling url: null, reason: Get config services failed from http://106.54.227.205/services/config?appId=sentinel&ip=192.168.51.209 [Cause: Could not complete get operation [Cause: java.lang.IllegalStateException: Expected BEGIN_ARRAY but was STRING at line 1 column 1 path $ [Cause: Expected BEGIN_ARRAY but was STRING at line 1 column 1 path $]]]

后面百度后才知道这个地址是erueka的地址,而不是apollo客户端的地址。但是我用的apollo是官网的啊!我怎么知道erueka地址,最后无奈还是用了自己的apollo……


版权声明:本文为weixin_43911945原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。