Spark日志文件清洗及分析

1、数据清洗

  • 读入日志文件并转化为RDD[Row]类型
    • 按照Tab切割数据
    • 过滤掉字段数量少于8个的
  • 对数据进行清洗
    • 按照第一列和第二列对数据进行去重
    • 过滤掉状态码非200
    • 过滤掉event_time为空的数据
    • 将url按照”&”以及”=”切割
  • 保存数据
    • 将数据写入mysql表中
      在这里插入图片描述
      文件目录:D:\test\t\test.log,一条数据的结构如下:
2018-09-04T20:27:31+08:00	http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451540&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F58.0.3029.110+Safari%2F537.36+SE+2.X+MetaSr+1.0&actionEnd=1536150451668&actionName=startEval&actionTest=0&actionType=3&actionValue=272090&clientType=001_kgc&examType=001&ifEquipment=web&isFromContinue=false&skillIdCount=0&skillLevel=0&testType=jineng&userSID=B842B843AE317425D53D0C567A903EF7.exam-tomcat-node3.exam-tomcat-node3&userUID=272090&userUIP=1.180.18.157	GET	200	192.168.168.64	-	-	Apache-HttpClient/4.1.2 (java 1.5)
import java.util.Properties

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object DataClear {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("clearDemo")
      .getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext
    val linesRdd = sc.textFile("D:\\test\\t\\test.log")

    val rdd = linesRdd.map(x=>x.split("\t"))
      .filter(x=>x.length==8)
      .map(x=>Row(x(0).trim,x(1).trim,x(2).trim,x(3).trim,x(4).trim,x(5).trim,x(6).trim,x(7).trim))

    val schema = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("url", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
      )
    )

    val orgDF = spark.createDataFrame(rdd,schema)
    //orgDF.printSchema()
    //orgDF.show(10,false) //默认是true,false是全部显示表格内容

    //按照第一列和第二列对数据进行去重,
    // 过滤掉状态码非200
    // 过滤掉event_time为空的数据

    val ds1 = orgDF.dropDuplicates("event_time", "url")
      .filter(x => x(3) == "200")
      .filter(x => StringUtils.isNotEmpty(x(0).toString))
    //ds1.show(10)

    // 将url按照”&”以及”=”切割
    val dfDetail = ds1.map(row => {
      val urlArray = row.getAs[String]("url").split("\\?")
      //另一种写法
      //row(1).toString.split("\\?")
      var map1: Map[String, String] = Map("params" -> "null")
      if (urlArray.length == 2) {
        map1 = urlArray(1)
          .split("&")
          .map(x => x.split("="))
          .filter(_.length == 2)
          .map(x => (x(0), x(1)))
          .toMap
      }

      (row.getAs[String]("event_time"),
        map1.getOrElse("actionBegin", ""),
        map1.getOrElse("actionClient", ""),
        map1.getOrElse("actionEnd", ""),
        map1.getOrElse("actionName", ""),
        map1.getOrElse("actionType", ""),
        map1.getOrElse("actionValue", ""),
        map1.getOrElse("clientType", ""),
        map1.getOrElse("examType", ""),
        map1.getOrElse("ifEquipment", ""),
        map1.getOrElse("isFromContinue", ""),
        map1.getOrElse("skillLevel", ""),
        map1.getOrElse("testType", ""),
        map1.getOrElse("userSID", ""),
        map1.getOrElse("userUID", ""),
        map1.getOrElse("userUIP", ""),
        row.getAs[String]("method"),
        row.getAs[String]("status"),
        row.getAs[String]("sip"),
        row.getAs[String]("user_uip"),
        row.getAs[String]("action_prepend"),
        row.getAs[String]("action_client"))
    }).toDF()

    val detailRDD = dfDetail.rdd


    val detailschema = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("actionBegin", StringType),
        StructField("actionClient", StringType),
        StructField("actionEnd", StringType),
        StructField("actionName", StringType),
        StructField("actionType", StringType),
        StructField("actionValue", StringType),
        StructField("clientType", StringType),
        StructField("examType", StringType),
        StructField("ifEquipment", StringType),
        StructField("isFromContinue", StringType),
        StructField("skillLevel", StringType),
        StructField("testType", StringType),
        StructField("userSID", StringType),
        StructField("userUID", StringType),
        StructField("userUIP", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
    )
    )

    var detailDF=spark.createDataFrame(detailRDD,detailschema)
    //detailDF.show(10)

    val url="jdbc:mysql://192.168.136.20:3306/kb09db"
    val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","ok")
    prop.setProperty("driver","com.mysql.jdbc.Driver")

    println("开始写入mysql")  //overwrite覆盖  append追加
    detailDF.write.mode("overwrite").jdbc(url,"logdetail",prop)
    orgDF.write.mode("overwrite").jdbc(url,"logorg",prop)
    println("写入结束")
  }
}

2、用户留存分析

  • 计算用户的次日留存率
    • 求当天新增用户总数n
    • 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数)
    • m/n*100%
import java.text.SimpleDateFormat
import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

object UserAnalyse {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("UserAnalyse")
      .getOrCreate()
    import spark.implicits._
    val sc = spark.sparkContext

    val url = "jdbc:mysql://192.168.136.20:3306/kb09db"
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "ok")
    prop.setProperty("driver", "com.mysql.jdbc.Driver")

    val detailDF = spark.read.jdbc(url, "logdetail", prop)
    //println(detailDF.count())
    //detailDF.show(3)

    val changeTimeFun = spark.udf.register("changeTime", (x: String) => {
      val time: Long = new SimpleDateFormat("yyyy-MM-dd")
        .parse(x.substring(1, 10)).getTime
      time
    })


    //所有的注册用户信息(userUID,register_time,注册行为)
    val registDF = detailDF.filter(detailDF("actionName") === "Registered")
      .select("userUID", "event_time", "actionName")
      .withColumnRenamed("event_time", "register_time")
      .withColumnRenamed("userUID", "regUID")

    //去重
    val registDF2: DataFrame = registDF.select($"regUID",
      changeTimeFun($"register_time").as("register_date"),
      $"actionName").distinct()


    //所有的登录用户信息(userUID,signin_time,登录行为)
    val signinDF = detailDF.filter(detailDF("actionName") === "Signin")
      .select("userUID", "event_time", "actionName")
      .withColumnRenamed("event_time", "signin_time")
      .withColumnRenamed("userUID", "sigUID")

    //去重
    val signinDF2: DataFrame = signinDF.select($"sigUID",
      changeTimeFun($"signin_time").as("signin_date"),
      $"actionName").distinct()


    println("注册用户数量" + registDF2.count())

    println("登录用户数量" + signinDF2.count())


    val joinDF = registDF2.join(signinDF2, signinDF2("sigUID") === registDF2("regUID"))

    val frame = joinDF.filter(
      joinDF("register_date") === joinDF("signin_date") - 86400000)
      .groupBy($"register_date").count()
        .withColumnRenamed("count","sigcount")
    //frame.printSchema()
    //frame.show()

    val frame1 = registDF2.groupBy($"register_date").count()
        .withColumnRenamed("count","regcount")

    //frame1.printSchema()
    //frame1.show()


    val frame2=frame.join(frame1, "register_date")
    //frame2.show()

    frame2.map(x=>(
      x.getAs[Long]("register_date")
      , x.getAs[Long]("sigcount")
      ,  x.getAs[Long]("regcount")
      ,x.getAs[Long]("sigcount").toDouble/x.getAs[Long]("regcount")
    )).show()
  }
}

版权声明:本文为weixin_48185778原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。