1、数据清洗
- 读入日志文件并转化为RDD[Row]类型
- 按照Tab切割数据
- 过滤掉字段数量少于8个的
- 对数据进行清洗
- 按照第一列和第二列对数据进行去重
- 过滤掉状态码非200
- 过滤掉event_time为空的数据
- 将url按照”&”以及”=”切割
- 保存数据
- 将数据写入mysql表中
文件目录:D:\test\t\test.log,一条数据的结构如下:
- 将数据写入mysql表中
2018-09-04T20:27:31+08:00 http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451540&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F58.0.3029.110+Safari%2F537.36+SE+2.X+MetaSr+1.0&actionEnd=1536150451668&actionName=startEval&actionTest=0&actionType=3&actionValue=272090&clientType=001_kgc&examType=001&ifEquipment=web&isFromContinue=false&skillIdCount=0&skillLevel=0&testType=jineng&userSID=B842B843AE317425D53D0C567A903EF7.exam-tomcat-node3.exam-tomcat-node3&userUID=272090&userUIP=1.180.18.157 GET 200 192.168.168.64 - - Apache-HttpClient/4.1.2 (java 1.5)
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object DataClear {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("clearDemo")
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val linesRdd = sc.textFile("D:\\test\\t\\test.log")
val rdd = linesRdd.map(x=>x.split("\t"))
.filter(x=>x.length==8)
.map(x=>Row(x(0).trim,x(1).trim,x(2).trim,x(3).trim,x(4).trim,x(5).trim,x(6).trim,x(7).trim))
val schema = StructType(
Array(
StructField("event_time", StringType),
StructField("url", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
val orgDF = spark.createDataFrame(rdd,schema)
//orgDF.printSchema()
//orgDF.show(10,false) //默认是true,false是全部显示表格内容
//按照第一列和第二列对数据进行去重,
// 过滤掉状态码非200
// 过滤掉event_time为空的数据
val ds1 = orgDF.dropDuplicates("event_time", "url")
.filter(x => x(3) == "200")
.filter(x => StringUtils.isNotEmpty(x(0).toString))
//ds1.show(10)
// 将url按照”&”以及”=”切割
val dfDetail = ds1.map(row => {
val urlArray = row.getAs[String]("url").split("\\?")
//另一种写法
//row(1).toString.split("\\?")
var map1: Map[String, String] = Map("params" -> "null")
if (urlArray.length == 2) {
map1 = urlArray(1)
.split("&")
.map(x => x.split("="))
.filter(_.length == 2)
.map(x => (x(0), x(1)))
.toMap
}
(row.getAs[String]("event_time"),
map1.getOrElse("actionBegin", ""),
map1.getOrElse("actionClient", ""),
map1.getOrElse("actionEnd", ""),
map1.getOrElse("actionName", ""),
map1.getOrElse("actionType", ""),
map1.getOrElse("actionValue", ""),
map1.getOrElse("clientType", ""),
map1.getOrElse("examType", ""),
map1.getOrElse("ifEquipment", ""),
map1.getOrElse("isFromContinue", ""),
map1.getOrElse("skillLevel", ""),
map1.getOrElse("testType", ""),
map1.getOrElse("userSID", ""),
map1.getOrElse("userUID", ""),
map1.getOrElse("userUIP", ""),
row.getAs[String]("method"),
row.getAs[String]("status"),
row.getAs[String]("sip"),
row.getAs[String]("user_uip"),
row.getAs[String]("action_prepend"),
row.getAs[String]("action_client"))
}).toDF()
val detailRDD = dfDetail.rdd
val detailschema = StructType(
Array(
StructField("event_time", StringType),
StructField("actionBegin", StringType),
StructField("actionClient", StringType),
StructField("actionEnd", StringType),
StructField("actionName", StringType),
StructField("actionType", StringType),
StructField("actionValue", StringType),
StructField("clientType", StringType),
StructField("examType", StringType),
StructField("ifEquipment", StringType),
StructField("isFromContinue", StringType),
StructField("skillLevel", StringType),
StructField("testType", StringType),
StructField("userSID", StringType),
StructField("userUID", StringType),
StructField("userUIP", StringType),
StructField("method", StringType),
StructField("status", StringType),
StructField("sip", StringType),
StructField("user_uip", StringType),
StructField("action_prepend", StringType),
StructField("action_client", StringType)
)
)
var detailDF=spark.createDataFrame(detailRDD,detailschema)
//detailDF.show(10)
val url="jdbc:mysql://192.168.136.20:3306/kb09db"
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
println("开始写入mysql") //overwrite覆盖 append追加
detailDF.write.mode("overwrite").jdbc(url,"logdetail",prop)
orgDF.write.mode("overwrite").jdbc(url,"logorg",prop)
println("写入结束")
}
}
2、用户留存分析
- 计算用户的次日留存率
- 求当天新增用户总数n
- 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数)
- m/n*100%
import java.text.SimpleDateFormat
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
object UserAnalyse {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("UserAnalyse")
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val url = "jdbc:mysql://192.168.136.20:3306/kb09db"
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "ok")
prop.setProperty("driver", "com.mysql.jdbc.Driver")
val detailDF = spark.read.jdbc(url, "logdetail", prop)
//println(detailDF.count())
//detailDF.show(3)
val changeTimeFun = spark.udf.register("changeTime", (x: String) => {
val time: Long = new SimpleDateFormat("yyyy-MM-dd")
.parse(x.substring(1, 10)).getTime
time
})
//所有的注册用户信息(userUID,register_time,注册行为)
val registDF = detailDF.filter(detailDF("actionName") === "Registered")
.select("userUID", "event_time", "actionName")
.withColumnRenamed("event_time", "register_time")
.withColumnRenamed("userUID", "regUID")
//去重
val registDF2: DataFrame = registDF.select($"regUID",
changeTimeFun($"register_time").as("register_date"),
$"actionName").distinct()
//所有的登录用户信息(userUID,signin_time,登录行为)
val signinDF = detailDF.filter(detailDF("actionName") === "Signin")
.select("userUID", "event_time", "actionName")
.withColumnRenamed("event_time", "signin_time")
.withColumnRenamed("userUID", "sigUID")
//去重
val signinDF2: DataFrame = signinDF.select($"sigUID",
changeTimeFun($"signin_time").as("signin_date"),
$"actionName").distinct()
println("注册用户数量" + registDF2.count())
println("登录用户数量" + signinDF2.count())
val joinDF = registDF2.join(signinDF2, signinDF2("sigUID") === registDF2("regUID"))
val frame = joinDF.filter(
joinDF("register_date") === joinDF("signin_date") - 86400000)
.groupBy($"register_date").count()
.withColumnRenamed("count","sigcount")
//frame.printSchema()
//frame.show()
val frame1 = registDF2.groupBy($"register_date").count()
.withColumnRenamed("count","regcount")
//frame1.printSchema()
//frame1.show()
val frame2=frame.join(frame1, "register_date")
//frame2.show()
frame2.map(x=>(
x.getAs[Long]("register_date")
, x.getAs[Long]("sigcount")
, x.getAs[Long]("regcount")
,x.getAs[Long]("sigcount").toDouble/x.getAs[Long]("regcount")
)).show()
}
}
版权声明:本文为weixin_48185778原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。