一、开窗函数介绍
- 开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
- 开窗用于为定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GroupBY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列
二、聚合函数和开窗函数对比
- 聚合函数是将多行变为一行,count,avg…
- 开窗函数是将一行变成多行
- 聚合函数如果要显示其他的列必须将列加入到group by中
- 开窗函数可以不使用group by,直接将所有信息显示出来
三、开窗函数分类
- 聚合开窗函数:
聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句 - 排序开窗函数:
排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是PARTITION BY子句 - 分区类型NTILE的开窗函数
四、开窗函数语法
# 聚合类型:SUM\MIN\MAX\AVG\COUNT
sum() OVER([PARTITION BY XXX][ORDER BY XXX [DESC]])
# 排序类型:ROW_NUMBER|RANK|DENSE_RANK
ROW_NUMBER() OVER([PARTITION BY XXX][ORDER BY XXX [DESE]])
# 分区类型:NTILE
NTILE(number) OVER([PARTITION BY XXX][ORDER BY XXX [DESC]])
五、实例
import string
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import os
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = '/root/anaconda3/envs/pyspark_env/bin/python'
os.environ['PYSPARK_PYTHON'] = PYSPARK_PYTHON
os.environ['PYSPARK_DRIVER_PYTHON'] = PYSPARK_PYTHON
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("create df").\
master("local[*]").\
config("spark.sql.shuffle.partitions","2").\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize(([
('张三', 'class_1', 99),
('王五', 'class_2', 35),
('王三', 'class_3', 57),
('王久', 'class_4', 12),
('王丽', 'class_5', 99),
('王娟', 'class_1', 90),
('王军', 'class_2', 91),
('王俊', 'class_3', 33),
('王君', 'class_4', 55),
('王珺', 'class_5', 66),
('郑颖', 'class_1', 11),
('郑辉', 'class_2', 33),
('张丽', 'class_3', 36),
('张张', 'class_4', 79),
('黄凯', 'class_5', 90),
('黄开', 'class_1', 90),
('黄恺', 'class_2', 90),
('王凯', 'class_3', 11),
('王凯杰', 'class_1', 11),
('王开杰', 'class_2', 3),
('王景亮', 'class_3', 99)
]))
schema = StructType().\
add("name",StringType()).\
add("class",StringType()).\
add("score",IntegerType())
df = rdd.toDF(schema)
# 窗口函数只用于SQL风格,所以注册表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *,AVG(score) OVER() AS avg_score FROM stu
""").show()
"""
SELECT *,AVG(score) OVER() AS avg_score FROM stu 等同于
SELECT * FROM stu
SELECT AVG(score) FROM stu
两个SQL的结果集进行整合而来
"""
# TODO 排序窗口
spark.sql("""
SELECT *,
ROW_NUMBER() OVER(ORDER BY score DESC) as row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *,
NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()
版权声明:本文为feizuiku0116原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。