机器学习特征稳定性评估-分布式scala实现

泰格:

       士别三日当刮目相待,模型也一样,客流不断变化,特征的分布不断变化,模型的结果分布也会受到影响。所以在工业界中,对线上模型的监控是非常重要的。

概述

       考察稳定性最好的办法就是抽取另一个时间窗口的数据(最好是最新时间的数据),通过模型在这些新数据上的表现来与之前在训练集和测试集上的表现进行比较,看模型的稳定性,其效果衰减幅度是否可以接受,如果条件许可,最好用几个不同时间窗口的数据分别进行观察比较。

公式

PSI = sum((实际占比-预期占比)* ln(实际占比/预期占比))

稳定性的结果含义​​​​​​​

PSI含义行动项
小于0.1稳定性很好该干嘛干嘛
0.1-0.25稳定性一般关注一下
大于0.25稳定性较差模型重做

 

模型的稳定性评估包括特征的稳定性评估和结果的稳定性评估,两者在评估方法上没有什么区别。

结果评估举例

比如训练一个logistic回归模型,预测时候会有个概率输出p。
测试集上的输出设定为p1吧,将它从小到大排序后10等分,如0-0.1,0.1-0.2 
现在用这个模型去对新的样本进行预测,预测结果叫p2,按p1的区间也划分为10等分。
实际占比就是p2上在各区间的用户占比,预期占比就是p1上各区间的用户占比。
意义就是如果模型跟稳定,那么p1和p2上各区间的用户应该是相近的,占比不会变动很大,也就是预测出来的概率不会差距很大

 

稳定性评估需要的输入

     模型的稳定性评估,需要的输入主要是评估的特征列或者结果列。

列的类型

主要可以分解为三种:1、连续性变量 2、离散型变量 3、枚举型变量 

连续性变量:例如评分卡的输出的分数,为1-10分。

离散型变量:例如评级模型的输出,为1、2、3、4、5

枚举型变量:例如我需要评估省份的稳定性,但是我数据里面只有城市,则我需要对每个城市进行枚举:福建(厦门、福州、泉州、漳州。。。)

对于连续性的变量,我们需要对特征进行分箱后进行稳定性评估。

分箱方式

分箱的方式主要有:1、等分 2、等频  3、自定义分箱

等分:例如0-1的预测概率结果值,我们采用10等分,则分箱为0-0.1,0.1-0.2......

等频:等频的话,考虑的是每个分箱里面的数量是一样多的,所以分箱的结果是根据数据分布的结果

自定义分箱:业务根据专家规则进行分箱,或者在评分卡模型里面常用的根据WOE/IV进行分箱的最优化选择

 

 

分布式scala模型文档性代码

    对于模型评估来说,大部分的场景的模型评估方法分布式是没有意义的

但是对于模型稳定性的评估,分布式计算是非常有意义的。

原因:1、一般模型稳定性评估,计算的时间跨度都比较大,数据量大

 2、模型稳定性的评估,算法不需要进行全局的排序,没有依赖问题

  def getBinCount(featuresRdd: RDD[Row]) = {
    featuresRdd.mapPartitions(it => {
      var resultMap = new mutable.HashMap[String, Int]
      val columnsArray: JSONArray = paramJson.get("columns").asInstanceOf[JSONArray]
      while (it.hasNext) {
        val row = it.next()
        for (j <- 0 until columnsArray.size()) {
          val json: JSONObject = columnsArray.get(j).asInstanceOf[JSONObject]
          val columnName = String.valueOf(json.get("columnName"))
          val value = row.getAs[Any](columnName)
          val binArr = json.get("binArr").asInstanceOf[JSONArray]
          val paramArr1 = json.get("paramArr1").asInstanceOf[JSONArray]
          val paramArr2 = json.get("paramArr2").asInstanceOf[JSONArray]
          val columnType = String.valueOf(json.get("columnType"))
          for (i <- 0 until binArr.size()) {
            val binName = String.valueOf(binArr.get(i))
            val key = columnName + "|" + binName
            if (columnType.equals("range")) {
              var doubleValue = -9999.0
              try {
                doubleValue = String.valueOf(value).toDouble
              } catch {
                case ex: Exception => println("异常数据", value)
              }
              val start = String.valueOf(paramArr1.get(i)).toDouble
              val end = String.valueOf(paramArr2.get(i)).toDouble
              if (doubleValue > start && doubleValue <= end) {
                if (resultMap.contains(key)) {
                  resultMap(key) = resultMap.get(key).get + 1
                } else {
                  resultMap(key) = 1
                }
              }
            }
            if (columnType.equals("bin")) {
              val binValue = String.valueOf(paramArr1.get(i))
              if (binValue.equals(value)) {
                if (resultMap.contains(key)) {
                  resultMap.put(key, resultMap.get(key).get + 1)
                } else {
                  resultMap.put(key, 1)
                }
              }
            }

          }
        }
      }
      resultMap.iterator
    }).reduceByKey((x, y) => {
      x + y
    })
      .map(t => {
        val columnName = t._1.split("\\|")(0)
        val binName = t._1.split("\\|")(1)
        val count = t._2
        (columnName, binName, count)
      }
      ).collect()
  }
  def getPsiValues(columnMap:mutable.Map[String,mutable.Map[String,Double]],compareColumnMap:mutable.Map[String,mutable.Map[String,Double]])={
      val it=columnMap.keys.iterator
      var resultMap=new mutable.HashMap[String,Double]()
      while (it.hasNext){
        val columnName=it.next()
        val binMap=columnMap.get(columnName).get
        val compareBinMap=compareColumnMap.get(columnName).get
        val itt=binMap.keys.iterator
        var result=0.0
        while(itt.hasNext){
          val bin=itt.next()
          val newV: Double =binMap.get(bin).get
          val oldV=compareBinMap.get(bin).get
          result+=(newV-oldV)/Math.log(newV/oldV)
        }
        resultMap.put(columnName,result)
      }
    resultMap

  }

 

算法比较简单:先map计算每个分区,写成分箱名称,分箱个数的方式,然后进行reduce,计算几个分箱的统计值。

我这里的代码是涉及到多个特征字段一起进行评估的代码,对于同一个模型多个特征一起评估。

 

欢迎大家学习交流

 

 

 

 

   


版权声明:本文为xiefu5hh原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。