日志数据操作--json转成表格式

日志处理流程,json转成表类型格式。

日志数据前有一时间戳,后面才是json格式数据。

导入SPARKSQL所需要的包
scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

 val rdd=sc.textFile("hdfs:///app/op.log")--读取数据
 val json=rdd.map(x=>x.split('|')).map(x=>(x(0),x(1))).map(x=>x._2)--符号“|”前后分割开。前面为时间戳,后面为日志数据。
 val df=json.toDF--转换成df
 val obj=df.select(get_json_object($"value","$.cm").alias("cm"))
 
 得到日志的表头,起别名
 val obj=df.select(get_json_object($"value","$.cm").alias("cm"),
				   get_json_object($"value","$.ap").alias("ap"),
				   get_json_object($"value","$.et").alias("et"))
 “cm”列里数据较多。都取出来。	
 val obj1=obj.select($"ap",
						   get_json_object($"cm","$.ln").alias("ln"),
						   get_json_object($"cm","$.sv").alias("sv"),
						   get_json_object($"cm","$.os").alias("os"),
						   get_json_object($"cm","$.g").alias("g"),
						   get_json_object($"cm","$.mid").alias("mid"),
						   get_json_object($"cm","$.nw").alias("nw"),
						   get_json_object($"cm","$.l").alias("l"),
						   get_json_object($"cm","$.vc").alias("vc"),
						   get_json_object($"cm","$.hw").alias("hw"),
						   get_json_object($"cm","$.ar").alias("ar"),
						   get_json_object($"cm","$.uid").alias("uid"),
						   get_json_object($"cm","$.t").alias("t"),
						   get_json_object($"cm","$.la").alias("la"),
						   get_json_object($"cm","$.md").alias("md"),
						   get_json_object($"cm","$.vn").alias("vn"),
						   get_json_object($"cm","$.ba").alias("ba"),
						   get_json_object($"cm","$.sr").alias("sr"),
					 $"et")
 将“et”列的数据取出来
 val obj2=obj1.select($"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",
						from_json($"et",ArrayType(StructType(StructField("ett",StringType)::
									  StructField("en",StringType)::
									  StructField("kv",StringType)::Nil))).alias("events")
					 )
 obj2.select($"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",
			 explode($"events")).show
 
 val obj3=obj2.select(explode($"events").alias("events")).show(false)

版权声明:本文为lhmqy原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。