SparkSQL-解析

1、新的起始点SparkSession

        在老的版本中,SparkSQL提供两种SQL查询起始点,一个叫SQLContext,用于Spark自己提供的SQL查询,一个叫HiveContext,用于连接Hive的查询,SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

SparkSession.builder 用于创建一个SparkSession。

import spark.implicits._的引入是用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。

如果需要Hive支持,则需要以下创建语句:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2、创建DataFrames

        在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。

从Spark数据源进行创建:

val df = spark.read.json("examples/src/main/resources/people.json")	
df.show()
结果:
 +----+-------+
 | age|   name|
 +----+-------+
 |null|Michael|
 |  30|   Andy|
 |  19| Justin|
 +----+-------+

从RDD进行转换:

package chapter3
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 从RDD进行转化成DataFrame
 * */
object SparkSql_Txt_Demo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("txt").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //加载文件
    val file: RDD[String] = sc.textFile("D:\\a资料offcn\\5.第五阶段\\Day08\\SparkDay01\\资料\\data\\person.txt")
    //根据文件进行切割
    val spliFile: RDD[Array[String]] = file.map(_.split(" "))
    //针对每一列加泛型
    val personRDD: RDD[(Int, String, String)] = spliFile.map(line => (line(0).toInt, line(1), line(2.toInt)))
    //将RDD转化为DataFrame
    //倒包
    import spark.implicits._
    val personDF: DataFrame = personRDD.toDF("id","name","age")
    personDF.show()
  }

}
结果:
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  4| zhaoliu| 30|
|  5|  tianqi| 35|
|  6|    kobe| 40|
+---+--------+---+

show()方法中,加入数字1,就显示一行,2,显示两行,以此类推。false:是显示字段全部。

3、DataFrame常用操作

(1)DSL(Domain Specific Language)风格语法

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

 (2)SQL风格语法

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+


// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

        临时表是Session范围内的,Session退出后,表就失效了。如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.people

4、创建DataSet

        Dataset是具有强类型的数据集合,需要提供对应的类型信息。

package chapter3
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

case class People(age:Long,hobby:String,name:String)
object CreateDS_Demo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("creatDS").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    //加载文件
    val personDF: DataFrame = spark.read.json("D:\\a资料offcn\\5.第五阶段\\Day08\\SparkDay01\\资料\\data\\people.json")
    //查看约束信息
    personDF.printSchema()
    import spark.implicits._
    val personDS: Dataset[People] = personDF.as[People]
    personDS.show()
  }
}
结果:
root
 |-- age: long (nullable = true)
 |-- hobby: string (nullable = true)
 |-- name: string (nullable = true)

+---+----------+-------+
|age|     hobby|   name|
+---+----------+-------+
| 23|   running|   json|
| 32|basketball|charles|
| 28|  football|    tom|
| 24|   running|   lili|
| 20|  swimming|    bob|
+---+----------+-------+

5、Dataset和RDD互操作

Spark SQL支持通过两种方式将存在的RDD转换为Dataset,转换的过程中需要让Dataset获取RDD中的Schema信息,主要有两种方式,一种是通过反射来获取RDD中的Schema信息。这种方式适合于列名已知的情况下。第二种是通过编程接口的方式将Schema信息应用于RDD,这种方式可以处理那种在运行时才能知道列的方式。

(1)通过指定列名创建DataFrame

package chapter3
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object Schema_Demo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("schema").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    
    //通过指定列名创建DataFrame
    //加载文件
    val file: RDD[String] = sc.textFile("D:\\a资料offcn\\5.第五阶段\\Day08\\SparkDay01\\资料\\data\\person.txt")
    val spliFile: RDD[Array[String]] = file.map(_.split(" "))
    //按照字段指定类型
    val personRDD: RDD[(Int, String, Int)] = spliFile.map(line => (line(0).toInt, line(1), line(2).toInt))
    //调用toDF函数
    import spark.implicits._
    val personDF: DataFrame = personRDD.toDF("id","name","age")
    personDF.show()
  }

}
结果:
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  4| zhaoliu| 30|
|  5|  tianqi| 35|
|  6|    kobe| 40|
+---+--------+---+

(2)通过反射获取Schema

SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。

package chapter3
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

case class People1(id:Int,name:String,age:Int)
object Schema_Demo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("schema").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    //通过反射获取Schema创建DataFrame
    val file: RDD[String] = sc.textFile("D:\\a资料offcn\\5.第五阶段\\Day08\\SparkDay01\\资料\\data\\person.txt")
    //针对分隔符进行切割:
    val spliFile: RDD[Array[String]] = file.map(_.split(" "))
    //指定字段,要写样例类,通过反射获取字段名
    val personRDD: RDD[People1] = spliFile.map(line => People1(line(0).toInt, line(1), line(2).toInt))
    //将RDD转化为DataFrame
    import spark.implicits._
    val personDF: DataFrame = personRDD.toDF()
    personDF.show()
  }
}
结果:
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  4| zhaoliu| 30|
|  5|  tianqi| 35|
|  6|    kobe| 40|
+---+--------+---+

6、类型之间的转换总结

RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

DataFrame/Dataset转RDD:

//DataFrame/DataSet转成RDD:
val rdd1=testDF.rdd
val rdd2=testDS.rdd

//RDD转DataFrame:
import spark.implicits._
val testDF = rdd.map {line=>(line._1,line._2)}.toDF("col1","col2")
//一般用元组把一行的数据写在一起,然后在toDF中指定字段名

//RDD转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>Coltest(line._1,line._2)}.toDS
//可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

//Dataset转DataFrame:这个也很简单,因为只是把case class封装成Row
import spark.implicits._
val testDF = testDS.toDF

//DataFrame转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]

        这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。

7、用户自定义函数

通过spark.udf功能用户可以自定义函数。

UDF 传入一参数,传出一个参数

UDAF 传入多参数,传出一个参数

开窗 传入多参数,传出多个参数

Select salary,add(salary+100) from emp;

Select avg(salary) from emp

有多个部门 it 考研 考公务员,每个部门最高工资

(1)用户自定义UDF函数

scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+



scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.createOrReplaceTempView("people")
scala> spark.sql("Select addName(name), age from people").show()

+-----------------+----+

|UDF:addName(name)| age|

+-----------------+----+

|     Name:Michael|null|

|        Name:Andy|  30|

|      Name:Justin|  19|

+-----------------+----+

(2)用户自定义聚合函数

        强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。

        用户自定义聚合函数:通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数。

packagecom.zg.d03

importorg.apache.spark.sql.Row
importorg.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
importorg.apache.spark.sql.types._

classMyAverageextendsUserDefinedAggregateFunction{
  // 聚合函数输入参数的数据类型
  override definputSchema: StructType =StructType(StructField("inputCloum",LongType)::Nil)
  // 聚合缓冲区中值得数据类型
  override defbufferSchema: StructType =StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil)
  // 返回值的数据类型
  override defdataType: DataType = DoubleType
  // 对于相同的输入是否一直返回相同的输出。
  // 确保一致性 一般用true,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。
  override defdeterministic: Boolean =true
  // 初始化
  override definitialize(buffer: MutableAggregationBuffer): Unit = {
    // 存工资的总额
    buffer(0) =0L
    // 存工资的个数
    buffer(1) =0L
  }
  // 相同Execute间的数据合并。
  override defupdate(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if(!input.isNullAt(0)){
      //工资累加
      buffer(0) = buffer.getLong(0)+input.getLong(0)
      //工资个数累加
      buffer(1) = buffer.getLong(1)+1
    }
  }
  // 不同Execute间的数据合并
  override defmerge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //工资累加
    buffer1(0) = buffer1.getLong(0)+buffer2.getLong(0)
    //工资个数相加
    buffer1(1) = buffer1.getLong(1)+buffer2.getLong(1)
  }

  override defevaluate(buffer: Row): Double = {
    buffer.getLong(0).toDouble/buffer.getLong(1)
  }
}

测试:

packagecom.zg.d03

importorg.apache.spark.{SparkConf, SparkContext}
importorg.apache.spark.sql.SparkSession

objectSparkSqlDemo {
  defmain(args: Array[String]): Unit = {
    valconf =newSparkConf().setAppName("sparksql").setMaster("local")
    valsc =newSparkContext(conf)
    valspark = SparkSession.builder().config(conf).getOrCreate()
    //读取json文件,创建dataFrame
    valdf01 = spark.read.json("D:\\wc\\employees.json")
    //创建一张临时表
    df01.createOrReplaceTempView("t_employees")
    //查询
    spark.sql("select * from t_employees").show()
    //注册自定义聚合函数
    spark.udf.register("myAverage",newMyAverage)
    spark.sql("select myAverage(salary) from t_employees").show()
  }
}

(3)开窗函数

over()开窗函数是按照某个字段分组,然后查询出另一字段的前几个的值,相当于 分组取topN

row_number() over (partitin by XXX order by XXX)

rank() 跳跃排序,有两个第二名是,后边跟着的是第四名

dense_rank()  连续排序,有两个第二名是,后边跟着的是第三名

row_number() 连续排序,两个值相同排序也是不同

在使用聚合函数后,会将多行变成一行,而over()开窗函数是其实就是给每个分组的数据,按照其排序的顺序,打上一个分组内的行号,直接将所有列信息显示出来。在使用聚合函数后,如果要显示其它的列必须将列加入到group by中,而使用开窗函数后,可以不使用group by。

姓名

班级

成绩

a

1

88

b

1

78

c

1

95

d

2

74

e

2

92

f

3

99

g

3

99

h

3

45

i

3

53

j

3

78

代码示例:

package chapter3
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

case class Score(name:String,clazz:Int,score:Int)
object Over_Demo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("over").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    val arr01 = Array(("a",1,88),
      ("b",1,78),
      ("c",1,95),
      ("d",2,74),
      ("e",2,92),
      ("f",3,99),
      ("g",3,99),
      ("h",3,45),
      ("i",3,53),
      ("j",3,78))
    //将数据转化为DataFrame
    import spark.implicits._
    val rdd: RDD[(String, Int, Int)] = sc.makeRDD(arr01)
    val scoreRDD: RDD[Score] = rdd.map(x => Score(x._1, x._2, x._3))
    val scoreDF: DataFrame = scoreRDD.toDF()
    //scoreDF.show()
    //使用sql风格查询,需要先注册一张表
    scoreDF.createOrReplaceTempView("t_score")
    //spark.sql("select * from t_score").show()
    //rank()
    //partition by分组,order by排序  rank取新的列名
    //spark.sql("select name,clazz,score,rank() over(partition by clazz order by score desc) rank from t_score").show()
    //dense_rank():有两个第一名,第三人是第二名,不会变成第三名。
    //spark.sql("select name,clazz,score,dense_rank() over(partition by clazz order by score desc) rank from t_score").show()
    //row_number()按照字典排序,如果两个人分数相同,按照字母顺序排名,不会出现并列名次
    spark.sql("select name,clazz,score,row_number() over(partition by clazz order by score desc) rank from t_score").show()
  }
}
结果:

+----+-----+-----+----+
|name|clazz|score|rank|
+----+-----+-----+----+
|   c|    1|   95|   1|
|   a|    1|   88|   2|
|   b|    1|   78|   3|
|   f|    3|   99|   1|
|   g|    3|   99|   2|
|   j|    3|   78|   3|
|   i|    3|   53|   4|
|   h|    3|   45|   5|
|   e|    2|   92|   1|
|   d|    2|   74|   2|
+----+-----+-----+----+


版权声明:本文为weixin_44085996原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。