文章目录
传送门:
- 视频地址:黑马程序员Spark全套视频教程
- 1.PySpark基础入门(一)
- 2.PySpark基础入门(二)
- 3.PySpark核心编程(一)
- 4.PySpark核心编程(二)
- 5.PySaprk——SparkSQL学习(一)
- 6.PySaprk——SparkSQL学习(二)
- 7.Spark综合案例——零售业务统计分析
- 8. Spark3新特性及核心概念(背)
一、快速入门
1. 什么是SparkSQL
SparkSQL是Spark的一个模块,用于处理海量结构化数据。
限定:结构化数据处理。
RDD算子可以处理结构化数据,非结构化数据,半结构化数据
2. 为什么要学习SparkSQL
SparkSQL是非常成熟的海量结构化数据处理框架。学习SparkSQL主要在2个点:
- SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等
- 企业大面积在使用SparkSQL处理业务数据
- 离线开发
- 数仓搭建
- 科学计算
- 数据分析
3. SparkSQL特点
- 融合性
SQL可以无缝集成在代码中,随时用SQL处理数据 - 统一数据访问
一套标准API可读写不同数据源 - Hive兼容
可以使用SparkSQL直接计算并生成Hive数据表 - 标准化连接
支持标准化JDBC\ODBC连接,方便和各种数据库进行数据交互。
二、SparkSQL概述
1. SparkSQL和Hive的异同
相同点:
- Hive和Spark 均是:“分布式SQL计算引擎”,均是构建大规模结构化数据计算的绝佳利器。
不同点:
2. SparkSQL的数据抽象
SparkSQL的DataFrame底层借鉴了Pandas的DataFrame,是二维表数据结构、分布式集合(分区)。SparkSQL现在使用的有2类数据抽象对象:
- DataSet对象,可用于Java、Scala语言
DataSet支持泛型特性(Python语言没有泛型特性),可以让Java、Scala语言更好的利用到 - DataFrame对象,可用于Java、Scala、Python、R语言
我们以Python开发SparkSQL,主要使用的就是DataFrame对象作为核心数据结构。
3. DataFrame数据抽象
DataFrame和RDD都是:弹性的、分布式的数据集。只是,DataFrame存储的数据结构“限定”为:二维表结构化数据而RDD可以存储的数据则没有任何限制,想处理什么就处理什么。
4. SparkSession对象
在RDD阶段,程序的执行入口对象是: SparkContext对象。在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:
- 用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象。
6. SparkSQL HelloWorld
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""
SparkSession入口对象作为SQL的编程入口
SparkContext入口对象作为RDD的编程入口
"""
# SparkSession对象的导包,SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 0.构建SparkSession入口对象
# appName 设置程序名称, config设置一些常用属性
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
getOrCreate()
# 1.通过SparkSession对象获取SparkContext对象
sc = spark.sparkContext
# SparkSQL的HelloWord
# 读取数据
df = spark.read.csv('../data/input/stu_score.txt', sep=',', header=False)
# 设置列名
df2 = df.toDF('id', 'name', 'score')
df2.printSchema() # 打印表结构
df2.show() # 打印表内容
df2.createTempView('score')
# SQL风格
spark.sql("""
SELECT * FROM score WHERE name='语文' LIMIT 5
""").show()
# DSL风格
df2.where("name='语文'").limit(5).show()
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- score: string (nullable = true)
+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
| 6|语文| 99|
| 7|语文| 99|
| 8|语文| 99|
| 9|语文| 99|
| 10|语文| 99|
| 11|语文| 99|
| 12|语文| 99|
| 13|语文| 99|
| 14|语文| 99|
| 15|语文| 99|
| 16|语文| 99|
| 17|语文| 99|
| 18|语文| 99|
| 19|语文| 99|
| 20|语文| 99|
+---+----+-----+
only showing top 20 rows
+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
+---+----+-----+
+---+----+-----+
| id|name|score|
+---+----+-----+
| 1|语文| 99|
| 2|语文| 99|
| 3|语文| 99|
| 4|语文| 99|
| 5|语文| 99|
+---+----+-----+
三、DataFrame入门和操作
1. DataFrame的组成
DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:
- 行
- 列
- 表结构描述
基于这个前提,DataFrame的组成如下:
在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
在数据层面
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息
如图,在表结构层面,DataFrame的表结构由StructType对象来描述,如下图:
一个StructField记录:列名、列类型、列是否为空。多个StructField组成一个StructType对象。一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。
2. DataFrame的代码构建
2.1 基于RDD的方式1
DataFrame对象可以从RDD转换而来,都是分布式数据集,就是转换一下内部存储的结构,转换为二维表结构。通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame。这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。
#!usr/bin/env python
# -*- coding:utf-8 -*-
# TODO:基于RDD的方式构建DataFrame对象
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
getOrCreate()
sc = spark.sparkContext
# 1.基于RDD转换成DataFrame
rdd = sc.textFile('../data/input/sql/people.txt'). \
map(lambda x: x.split(',')). \
map(lambda x: (x[0], int(x[1])))
# 2. 构建DataFrame对象
# 参数1:被转换的RDD;参数2指定列名,通过list的形式指定,按照顺序依次提供字符串名称
df = spark.createDataFrame(rdd, schema=['name', 'age'])
# 打印DataFrame表结构
df.printSchema()
# 打印DataFrame数据
# 参数1表示展示多少条数据,默认不传的话是20
# 参数2表示是否对列进行截断,如果列的数据长度超过20个字符串程度,后续的内容不显示,以...代替
# 如果给False,表示全部显示(不截断),默认是True
df.show()
# 将DF对象转换为临时视图表,可供sql语句查询
df.createOrReplaceTempView('people')
spark.sql("SELECT * FROM people WHERE age < 30").show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Justin| 19|
+-------+---+
2.2 基于RDD的方式2
通过StructType对象来定义DataFrame的“表结构”转换RDD。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""
基于RDD的方式构建DataFrame对象,
通过StructType对象来定义DataFrame的“表结构”转换RDD
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
getOrCreate()
sc = spark.sparkContext
# 1.基于RDD转换成DataFrame
rdd = sc.textFile('../data/input/sql/people.txt'). \
map(lambda x: x.split(',')). \
map(lambda x: (x[0], int(x[1])))
# 2. 构建表结构描述对象——StructType对象
schema = StructType().\
add("name", StringType(), nullable=True). \
add("age", IntegerType(), nullable=False)
# 3.基于StructType对象去构建RDD到DF的转换
df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2.3 基于RDD的方式3
使用RDD的toDF方法转换RDD。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""
基于RDD的方式构建DataFrame对象,
使用RDD的toDF方法转换RDD
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
getOrCreate()
sc = spark.sparkContext
# 1.基于RDD转换成DataFrame
rdd = sc.textFile('../data/input/sql/people.txt'). \
map(lambda x: x.split(',')). \
map(lambda x: (x[0], int(x[1])))
# TODO:toDF的方式构建DataFrame
df1 = rdd.toDF(['name','age'])
df1.printSchema()
df1.show()
# TODO:通过表结构描述对象——StructType对象的方式构建DataFrame
schema = StructType().\
add("name", StringType(), nullable=True). \
add("age", IntegerType(), nullable=False)
df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2.4 基于pandas的DataFrame
将Pandas的DataFrame对象,转变为分布式的SparkSQL的DataFrame对象。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""
基于pandas的DataFrame构建SparkSQL的DataFrame
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
getOrCreate()
sc = spark.sparkContext
# TODO:基于pandas的DataFrame构建SparkSQL的DataFrame
pdf = pd.DataFrame({
'id': [1, 2, 3],
'name': ['张大仙', '王晓晓', '吕不韦'],
'age': [11, 12, 13]
})
df = spark.createDataFrame(pdf)
df.printSchema()
df.show()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+---+------+---+
| id| name|age|
+---+------+---+
| 1|张大仙| 11|
| 2|王晓晓| 12|
| 3|吕不韦| 13|
+---+------+---+
2.5 读取外部数据
通过SparkSQL的统一API进行数据读取构建DataFrame。
统一API示例代码:
sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可选
.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
.load("被读取文件的路径, 支持本地文件系统和HDFS")
读取text数据源
读取text数据源,使用format(“text”)读取文本数据。读取数据的特点是:将一整行只作为一个列读取,列名默认称之为value,类型是String#!usr/bin/env python # -*- coding:utf-8 -*- """ DataFrame的代码构建 - 读取外部数据 通过SparkSQL的统一API进行数据读取构建DataFrame """ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd if __name__ == '__main__': # 0.构建SparkSession执行环境入口对象 spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # 构建StructType对象 schema = StructType().add('data', StringType(), nullable=True) # read读取外部数据 df = spark.read.format('text'). \ schema(schema=schema). \ load('../data/input/sql/people.txt') df.printSchema() df.show()
root |-- data: string (nullable = true) +-----------+ | data| +-----------+ |Michael, 29| | Andy, 30| | Justin, 19| +-----------+
读取json数据源
使用format(“json”)读取json数据#!usr/bin/env python # -*- coding:utf-8 -*- """ DataFrame的代码构建 - 读取外部数据 通过SparkSQL的统一API进行数据读取构建DataFrame """ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd if __name__ == '__main__': # 0.构建SparkSession执行环境入口对象 spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # JSON数据自带schema信息 df = spark.read.format('json').load('../data/input/sql/people.json') df.printSchema() df.show()
root |-- age: long (nullable = true) |-- name: string (nullable = true) +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
读取csv数据源
使用format(“csv”)读取csv数据#!usr/bin/env python # -*- coding:utf-8 -*- """ DataFrame的代码构建 - 读取外部数据 通过SparkSQL的统一API进行数据读取构建DataFrame """ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd if __name__ == '__main__': # 0.构建SparkSession执行环境入口对象 spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # 读取CSV文件 df = spark.read.format('csv'). \ option('sep', ';'). \ option('header', True). \ option('encoding', 'utf-8'). \ schema('name STRING, age INT, job STRING'). \ load('../data/input/sql/people.csv') df.printSchema() df.show()
root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- job: string (nullable = true) +-----+----+---------+ | name| age| job| +-----+----+---------+ |Jorge| 30|Developer| | Bob| 32|Developer| | Ani| 11|Developer| | Lily| 11| Manager| | Put| 11|Developer| |Alice| 9| Manager| |Alice| 9| Manager| |Alice| 9| Manager| |Alice| 9| Manager| |Alice|null| Manager| |Alice| 9| null| +-----+----+---------+
读取parquet数据源
使用format(“parquet”)读取parquet数据
parquet: 是Spark中常用的一种列式存储文件格式。parquet对比普通的文本文件的区别:- parquet 内置schema (列名\ 列类型\ 是否为空)
- 存储是以列作为存储格式
- 存储是序列化存储在文件中的(有压缩属性体积小)
#!usr/bin/env python # -*- coding:utf-8 -*- """ DataFrame的代码构建 - 读取外部数据 通过SparkSQL的统一API进行数据读取构建DataFrame """ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd if __name__ == '__main__': # 0.构建SparkSession执行环境入口对象 spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # 读取parquet文件 df = spark.read.format('parquet').\ load('../data/input/sql/users.parquet') df.printSchema() df.show()
root |-- name: string (nullable = true) |-- favorite_color: string (nullable = true) |-- favorite_numbers: array (nullable = true) | |-- element: integer (containsNull = true) +------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:
3. DataFrame的入门操作
DataFrame支持两种风格进行编程,分别是:
DSL风格
DSL语法风格,可称之为:领域特定语言。其实就是指DataFrame的特有API。DSL风格意思就是以调用API的方式来处理Data。比如:df.where().limit()
#!usr/bin/env python # -*- coding:utf-8 -*- """ DSL入门 """ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd if __name__ == '__main__': # 0.构建SparkSession执行环境入口对象 spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # 1.读取txt数据 df = spark.read.format('csv'). \ schema('id INT,subject STRING,score INT'). \ load('../data/input/sql/stu_score.txt') # TODO:select API演示 # 获取column对象 id_column = df['id'] subject_column = df['subject'] df.select(['id', 'subject']).show() df.select('id', 'subject').show() df.select([id_column, subject_column]).show() df.select(id_column, subject_column).show() # TODO:filter API演示 df.filter('score < 99').show() df.filter(df['score'] < 99).show() # TODO:groupBy API # groupBy API的返回值是GroupedData对象,是一个有分组关系的数据结构,调用聚合方法后返回值是DataFrame df.groupBy('subject').count().show() df.groupBy(df['subject']).count().show()
SQL风格
SQL风格就是使用SQL语句处理DataFrame的数据,比如:spark.sql(“SELECT * FROM xxx)
。DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:- 全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
global_temp
- 临时表:只在当前SparkSession中可用
#!usr/bin/env python # -*- coding:utf-8 -*- """ SQL入门 """ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd if __name__ == '__main__': # 0.构建SparkSession执行环境入口对象 spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # 1.读取txt数据 df = spark.read.format('csv'). \ schema('id INT,subject STRING,score INT'). \ load('../data/input/sql/stu_score.txt') # 2.注册成临时表 df.createTempView('score') # 注册临时视图(表) df.createOrReplaceTempView('score_2') # 注册或替换临时视图(表) df.createGlobalTempView('score_3') # 注册全局临时视图,全局临时视图在使用的时候,需要在浅看带上global——temp前缀 # 3.通过SparkSession对象的spark API完成aql语句的执行 spark.sql("SELECT subject,COUNT(*) AS cnt FROM score GROUP BY subject").show() spark.sql("SELECT subject,COUNT(*) AS cnt FROM score_2 GROUP BY subject").show() spark.sql("SELECT subject,COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()
+-------+---+ |subject|cnt| +-------+---+ | 英语| 30| | 语文| 30| | 数学| 30| +-------+---+ +-------+---+ |subject|cnt| +-------+---+ | 英语| 30| | 语文| 30| | 数学| 30| +-------+---+ +-------+---+ |subject|cnt| +-------+---+ | 英语| 30| | 语文| 30| | 数学| 30| +-------+---+
- 全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
3.1 DSL风格下的API总结
- show方法:DataFrame的API,展示DataFrame中的数据, 默认展示20条
- printSchema方法:DataFrame的API,打印输出df的schema信息
- select方法:DataFrame的API,选择DataFrame中的指定列(通过传入参数进行指定)
- filter和where方法:DataFrame的API,过滤DataFrame内的数据,返回一个过滤后的DataFrame
- groupby分组:DataFrame的API,按照指定的列进行数据的分组, 返回值是GroupedData对象,后续可以使用min、max、avg、sum、count方法进行分组聚合
- agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
- alias: 它是Column对象的API, 可以针对一个列 进行改名
- withColumnRenamed: 它是DataFrame的API,可以对DF中的列进行改名, 一次改一个列,改多个列 可以链式调用
- orderBy: DataFrame的API,进行排序,参数1是被排序的列,参数2是 升序(True) 或 降序 False
- first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象
Row对象 就是一个数组, 你可以通过row[‘列名’] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
4. 词频统计案例
PySpark提供了一个包: pyspark.sql.functions
。这个包里面提供了 一系列的计算函数供SparkSQL使用。这些功能函数,返回值多数都是Column对象。
#!usr/bin/env python
# -*- coding:utf-8 -*-
"""
WordCount
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
getOrCreate()
sc = spark.sparkContext
# TODO 1:SQL风格进行处理
rdd = sc.textFile('../data/input/words.txt'). \
flatMap(lambda x: x.split(' ')). \
map(lambda x: [x])
# 使用RDD的toDF方法,将RDD转换为DataFrame
df = rdd.toDF(['word'])
# 注册DF为临时视图
df.createTempView('words')
spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()
# TODO 2: DSL风格
# 读取text数据源,将一整行只作为一列读取
df = spark.read.format('text'). \
load('../data/input/words.txt')
# withColumn方法
# 方法功能:对已存在的列进行操作,返回一个新的列,如果名字与老列相同,那么替换;否则,作为新列存在
df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))
df2.groupBy('value'). \
count(). \
withColumnRenamed('value', 'word'). \
withColumnRenamed('count', 'cnt'). \
orderBy('cnt', ascending=False). \
show()
+------+---+
| word|cnt|
+------+---+
| hello| 3|
| spark| 1|
| flink| 1|
|hadoop| 1|
+------+---+
+------+---+
| word|cnt|
+------+---+
| hello| 3|
| spark| 1|
|hadoop| 1|
| flink| 1|
+------+---+
5. 电影评分数据分析
MovieLens数据集包含多个用户对多部电影的评级数据,也包括电影元数据信息和用户属性信息。
介绍
下面以ml-100k数据集为例进行介绍,下载u.data
文件。由943个用户对1682个电影的10000条评分组成。每个用户至少评分20部电影。用户和电影从1号开始连续编号。数据是随机排序的。需求:
- 查询用户平均分
- 查询电影平均分
- 查询大于平均分的电影的数量
- 查询高分电影中(>3打分次数最多的用户,并求出此人打的平均分
- 查询每个用户的平均打分,最低打分,最高打分
- 查询被评分超过100次的电影的平均分排名TOP10
代码:
#!usr/bin/env python # -*- coding:utf-8 -*- """ 电影评分数据分析 1. 查询用户平均分 2. 查询电影平均分 3. 查询大于平均分的电影的数量 4. 查询高分电影中(>3打分次数最多的用户,并求出此人打的平均分 5. 查询每个用户的平均打分,最低打分,最高打分 6. 查询被评分超过100次的电影的平均分排名TOP10 """ from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType import pandas as pd from pyspark.sql import functions as F if __name__ == '__main__': # 0.构建SparkSession执行环境入口对象 spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # 1.读取数据集 schema = StructType().add('user_id', StringType(), nullable=True). \ add('movie_id', IntegerType(), nullable=True). \ add('rank', IntegerType(), nullable=True). \ add('ts', StringType(), nullable=True) df = spark.read.format('csv'). \ option('sep', '\t'). \ option('header', False). \ option('encoding', 'utf-8'). \ schema(schema=schema). \ load('../data/input/sql/u.data') # TODO 1:用户平均分计算 # DSL风格 df.groupBy('user_id'). \ avg('rank'). \ withColumnRenamed('avg(rank)', 'avg_rank'). \ withColumn('avg_rank', F.round('avg_rank', 2)). \ orderBy('avg_rank', ascending=False). \ show() # # SQL风格 # df.createTempView('user') # spark.sql(""" # SELECT user_id, ROUND(AVG(rank),2) AS avg_rank FROM user GROUP BY user_id ORDER BY avg_rank DESC # """).show() # TODO 2:电影平均分计算 # DSL风格 df.groupBy('movie_id'). \ avg('rank'). \ withColumnRenamed('avg(rank)', 'avg_rank'). \ withColumn('avg_rank', F.round('avg_rank', 2)). \ orderBy('avg_rank', ascending=False). \ show() # # SQL风格 # df.createTempView('movie') # spark.sql(""" # SELECT movie_id, ROUND(AVG(rank),2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC # """).show() # TODO 3:查询大于平均分的电影的数量 # DSL风格 print("大于平均分电影的数量:", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count()) # TODO 4:查询高分电影中(>3),打分次数最多的用户,并求出此人打的平均分 user_id = df.where('rank > 3'). \ groupBy('user_id'). \ count(). \ withColumnRenamed('count', 'cnt'). \ orderBy('cnt', ascending=False). \ limit(1). \ first()['user_id'] # 计算这个人的打分平均分 df.filter(df['user_id'] == user_id). \ select(F.round(F.avg('rank'), 2)).show() # TODO 5:查询每个用户的平均打分,最低打分,最高打分 df.groupBy('user_id'). \ agg( F.round(F.avg('rank'), 2).alias('avg_rank'), F.min('rank').alias('min_rank'), F.max('rank').alias('max_rank'), ).show() # TODO 6: 查询被评分超过100次的电影,同时平均分排名TOP10 df.groupBy('movie_id'). \ agg( F.count('movie_id').alias('cnt'), F.round(F.avg('rank'), 2).alias('avg_rank') ).where('cnt > 100'). \ orderBy('avg_rank', ascending=False). \ limit(10). \ show()
+-------+--------+ |user_id|avg_rank| +-------+--------+ | 849| 4.87| | 688| 4.83| | 507| 4.72| | 628| 4.7| | 928| 4.69| | 118| 4.66| | 907| 4.57| | 686| 4.56| | 427| 4.55| | 565| 4.54| | 469| 4.53| | 850| 4.53| | 225| 4.52| | 330| 4.5| | 477| 4.46| | 636| 4.45| | 242| 4.45| | 583| 4.44| | 252| 4.43| | 767| 4.43| +-------+--------+ only showing top 20 rows +--------+--------+ |movie_id|avg_rank| +--------+--------+ | 1653| 5.0| | 1122| 5.0| | 1467| 5.0| | 1201| 5.0| | 1189| 5.0| | 1293| 5.0| | 1599| 5.0| | 1536| 5.0| | 814| 5.0| | 1500| 5.0| | 1449| 4.63| | 1398| 4.5| | 1594| 4.5| | 119| 4.5| | 1642| 4.5| | 408| 4.49| | 169| 4.47| | 318| 4.47| | 483| 4.46| | 64| 4.45| +--------+--------+ only showing top 20 rows 大于平均分电影的数量: 55375 +-------------------+ |round(avg(rank), 2)| +-------------------+ | 3.86| +-------------------+ +-------+--------+--------+--------+ |user_id|avg_rank|min_rank|max_rank| +-------+--------+--------+--------+ | 296| 4.18| 1| 5| | 467| 3.68| 2| 5| | 691| 4.22| 1| 5| | 675| 3.71| 1| 5| | 829| 3.55| 1| 5| | 125| 3.44| 1| 5| | 451| 2.73| 1| 5| | 800| 3.75| 2| 5| | 853| 2.98| 1| 5| | 666| 3.67| 2| 5| | 870| 3.45| 1| 5| | 919| 3.47| 1| 5| | 926| 3.3| 1| 5| | 7| 3.97| 1| 5| | 124| 3.5| 1| 5| | 51| 3.57| 1| 5| | 447| 3.6| 1| 5| | 591| 3.65| 2| 5| | 307| 3.79| 1| 5| | 475| 3.6| 1| 5| +-------+--------+--------+--------+ only showing top 20 rows +--------+---+--------+ |movie_id|cnt|avg_rank| +--------+---+--------+ | 408|112| 4.49| | 318|298| 4.47| | 169|118| 4.47| | 483|243| 4.46| | 64|283| 4.45| | 12|267| 4.39| | 603|209| 4.39| | 50|583| 4.36| | 178|125| 4.34| | 357|264| 4.29| +--------+---+--------+
6. SparkSQL Shuffle 分区数目
spark.sql.shuffle.partitions
参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200个。 对于集群模式来说, 200个默认也算比较合适。如果在local下运行,200个很多,在调度上会带来额外的损耗。 所以在local下建议修改比较低,比如2\4\10均可。这个参数和Spark RDD中设置并行度的参数是相互独立的。
7. SparkSQL 数据清洗API
在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。以people.csv
数据为例,进行如下操作
7.1 数据去重:dropDuplicates
功能:对DF的数据进行去重,如果重复数据有多条,取第—条
#!usr/bin/env python
# -*- coding:utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import time
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
config("spark.sql.shuffle.partitions", 2). \
getOrCreate()
sc = spark.sparkContext
# 1.读取数据
df = spark.read.format('csv'). \
option('sep', ';'). \
option('header', True). \
load('../data/input/sql/people.csv')
# TODO 1:数据清洗——数据去重:drop_duplicates
df.drop_duplicates().show()
# API同样可以针对字段进行去重
df.drop_duplicates(['age', 'job']).show()
+-----+----+---------+
| name| age| job|
+-----+----+---------+
| Bob| 32|Developer|
| Lily| 11| Manager|
|Alice| 9| null|
|Jorge| 30|Developer|
| Ani| 11|Developer|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice|null| Manager|
+-----+----+---------+
+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Alice|null| Manager|
| Ani| 11|Developer|
| Lily| 11| Manager|
|Jorge| 30|Developer|
| Bob| 32|Developer|
|Alice| 9| null|
|Alice| 9| Manager|
+-----+----+---------+
7.2 缺失值处理——dropna
功能:如果数据中包含null,通过dropna来进行判断,符合条件就删除这一行数据。
#!usr/bin/env python
# -*- coding:utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import time
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
config("spark.sql.shuffle.partitions", 2). \
getOrCreate()
sc = spark.sparkContext
# 1.读取数据
df = spark.read.format('csv'). \
option('sep', ';'). \
option('header', True). \
load('../data/input/sql/people.csv')
# TODO 1:数据去重:drop_duplicates
df.drop_duplicates().show()
# API同样可以针对字段进行去重
df.drop_duplicates(['age', 'job']).show()
# TODO 2:缺失值处理
# 无参数使用,只要有null,就删除一整行数据
df.dropna().show()
# thresh=3表示最少满足3个有效列,不满足就删除当前数据
df.dropna(thresh=3).show()
# subset用于设置要判断哪些列
df.dropna(thresh=2, subset=['name', 'age']).show()
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
+-----+---+---------+
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
+-----+---+---------+
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| null|
+-----+---+---------+
7.3 缺失值填充——fillna
功能:根据参数的规则,来进行null的替换
# coding:utf8
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext
"""读取数据"""
df = spark.read.format("csv").\
option("sep", ";").\
option("header", True).\
load("../data/input/sql/people.csv")
# TODO 3:缺失值填充:fillna
# 缺失值处理也可以完成对缺失值进行填充
# DataFrame的 fillna 对缺失的列进行填充
df.fillna("loss").show()
# 指定列进行填充
df.fillna("N/A", subset=['job']).show()
# 设定一个字典, 对所有的列 提供填充规则
df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()
+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice|loss| Manager|
|Alice| 9| loss|
+-----+----+---------+
+-----+----+---------+
| name| age| job|
+-----+----+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice|null| Manager|
|Alice| 9| N/A|
+-----+----+---------+
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
| Ani| 11|Developer|
| Lily| 11| Manager|
| Put| 11|Developer|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 9| Manager|
|Alice| 1| Manager|
|Alice| 9| worker|
+-----+---+---------+
8. DataFrame数据写出
SparkSQL统一API写出DataFrame数据。
统一API语法:
#!usr/bin/env python
# -*- coding:utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0.构建SparkSession执行环境入口对象
spark = SparkSession.builder. \
appName('test'). \
master('local[*]'). \
config("spark.sql.shuffle.partitions", 2). \
getOrCreate()
sc = spark.sparkContext
# 1.读取电影数据集
schema = StructType().add('user_id', StringType(), nullable=True). \
add('movie_id', IntegerType(), nullable=True). \
add('rank', IntegerType(), nullable=True). \
add('ts', StringType(), nullable=True)
df = spark.read.format('csv'). \
option('sep', '\t'). \
option('header', False). \
option('encoding', 'utf-8'). \
schema(schema=schema). \
load('../data/input/sql/u.data')
# write text,只能写出一个列的数据,需要将df转换成单列df
# concat_ws函数表示将指定列拼接成一列
df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')). \
write. \
mode('overwrite'). \
format('text'). \
save('../data/output/sql/text')
# write csv
df.write.mode('overwrite'). \
format('csv'). \
option('sep', ';'). \
option('header', True). \
save('../data/output/sql/csv')
# write json
df.write.mode('overwrite'). \
format('json'). \
save('../data/output/sql/json')
# write parquet
df.write.mode('overwrite'). \
format('parquet'). \
save('../data/output/sql/parquet')
将虚拟机的代码同步到本机,就可得到如图:
9. DataFrame 通过JDBC读写数据库
写出:
JDBC写出,会自动创建表的。因为DataFrame中有表结构信息, StructType记录的各个字段的名称类型和是否运行为空。
读取:
10. 总结
- DataFrame 在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据
- DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建
spark.read.format()
和df.write.format()
是DataFrame读取和写出的统一化标准API- SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能
- dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值
- SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作