PySpark之SparkSQL使用窗口函数

一、开窗函数介绍

  • 开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
  • 开窗用于为定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GroupBY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列

二、聚合函数和开窗函数对比

  • 聚合函数是将多行变为一行,count,avg…
  • 开窗函数是将一行变成多行
  • 聚合函数如果要显示其他的列必须将列加入到group by中
  • 开窗函数可以不使用group by,直接将所有信息显示出来

三、开窗函数分类

  • 聚合开窗函数:聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句
  • 排序开窗函数:排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是PARTITION BY子句
  • 分区类型NTILE的开窗函数

四、开窗函数语法

# 聚合类型:SUM\MIN\MAX\AVG\COUNT
sum() OVER([PARTITION BY XXX][ORDER BY XXX [DESC]])
# 排序类型:ROW_NUMBER|RANK|DENSE_RANK
ROW_NUMBER() OVER([PARTITION BY XXX][ORDER BY XXX [DESE]])
# 分区类型:NTILE
NTILE(number) OVER([PARTITION BY XXX][ORDER BY XXX [DESC]])

五、实例

import string
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
import os

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = '/root/anaconda3/envs/pyspark_env/bin/python'
os.environ['PYSPARK_PYTHON'] = PYSPARK_PYTHON
os.environ['PYSPARK_DRIVER_PYTHON'] = PYSPARK_PYTHON


if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("create df").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions","2").\
        getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize(([
        ('张三', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('王久', 'class_4', 12),
        ('王丽', 'class_5', 99),
        ('王娟', 'class_1', 90),
        ('王军', 'class_2', 91),
        ('王俊', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('王珺', 'class_5', 66),
        ('郑颖', 'class_1', 11),
        ('郑辉', 'class_2', 33),
        ('张丽', 'class_3', 36),
        ('张张', 'class_4', 79),
        ('黄凯', 'class_5', 90),
        ('黄开', 'class_1', 90),
        ('黄恺', 'class_2', 90),
        ('王凯', 'class_3', 11),
        ('王凯杰', 'class_1', 11),
        ('王开杰', 'class_2', 3),
        ('王景亮', 'class_3', 99)
    ]))

    schema = StructType().\
        add("name",StringType()).\
        add("class",StringType()).\
        add("score",IntegerType())
    df = rdd.toDF(schema)

    # 窗口函数只用于SQL风格,所以注册表先
    df.createTempView("stu")

    # TODO 聚合窗口
    spark.sql("""
        SELECT *,AVG(score) OVER() AS avg_score FROM stu
    """).show()
    """
    SELECT *,AVG(score) OVER() AS avg_score FROM stu 等同于
    SELECT * FROM stu
    SELECT AVG(score) FROM stu
    两个SQL的结果集进行整合而来
    """

    # TODO 排序窗口
    spark.sql("""
        SELECT *,
        ROW_NUMBER() OVER(ORDER BY score DESC) as row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(ORDER BY score) AS rank
        FROM stu
    """).show()

    # TODO NTILE
    spark.sql("""
        SELECT *,
        NTILE(6) OVER(ORDER BY score DESC) FROM stu
    """).show()


版权声明:本文为feizuiku0116原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。