SparkSQL中的自定义函数UDF、UDAF、UDTF(附UDF实现案例)

1、自定义函数介绍:

1.1UDF 函数(User-Defined-Function)

  • 一对一的关系,输入一个值经过函数以后输出一个值;
  • 在 Hive 中继承 UDF 类,方法名称为 evaluate,返回值不能为 void,其实就是实现一个方法;
  • select a,b, my_udf(a,b) as c from table1

1.2UDAF 聚合函数(User-Defined Aggregation Function)

  • 多对一的关系,输入多个值输出一个值,通常与 groupBy 联合使用;
  • select a,
           b,
           my_udaf(c) as avg_c
    from table1
    group by a,b

1.3UDTF 函数(User-Defined Table-Generating Functions)

  • 一对多的关系,输入一个值输出多个值(一行变为多行);
  • 用户自定义生成函数,有点像 flatMap;
  • -- select explode(array) col_1 from table1
    select my_udtf(array) col_1 from table1 

2、SparkSQL中实现UDF函数

2.1定义UDF:

  • 先导包:
    from pyspark.sql.functions import *
  • 语法1:udf1 = udf( lambda匿名函数,返回数据类型)
    udf1 = udf(lambda x:x+1, IntegerType())
  • 语法2:udf2 = udf( lambda有名函数,返回数据类型)
    def fun(x):return x + 1
    udf1 = udf(lambda x:fun(x), IntegerType())
  • 语法3:udf3 = udf( python普通函数,返回数据类型)
    def fun(x):return x+1
    udf1 = udf(fun,IntegerType())
  • 返回数据类型可以是:IntegerType()、FloatType()、ArrayType(),如果是元组复杂类型,还需自定义 schema
  • 语法4:使用 @udf 装饰器装饰 python 函数:
    @udf
    def udf1(x):
        if x < 10:
            return x + 1

2.2使用UDF:

  • DSL风格
    df.select(udf名称(字段).alias(别名)).show()
  • SQL风格

  • 第1步:注册临时视图
    df.createOrReplaceTempView('view1')
  • 第2步:定义UDF函数,并注册
    spark.udf.register('注册后的SQL版UDF函数名', 注册前的Python版UDF函数名)
  • 第3步:在SQL语句中使用UDF函数
    spark.sql('使用注册后的SQL版UDF函数名写SQL语句').show()

3、综合案例

  • 请提前安装pyarrow,即
  • pip install pyarrow或者
  • pip install pyspark[sql]
  • pip install pyarrow-4.0.1-cp38-cp38-manylinux2014_x86_64.whl
  • 否则可能会有问题。

3.1 案例一:

import string
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType, ArrayType, StructType, StringType

if __name__ == '__main__':
    # 1-创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions',  '6').getOrCreate()

    # 2-创建数据集
    pdf = pd.DataFrame(data={'integers': [1, 2, 3],
                             'floats': [-1.0, 0.6, 2.6],
                             'integer_arrays': [[1, 2], [3, 4], [5, 6, 8, 9]]})
    df = spark.createDataFrame(pdf)

    # 3-定义方式1- udf(lambda 匿名函数,返回数据类型) 整数->平方
    udf1= udf(lambda x:x**2  , IntegerType()  )
    # 4-使用udf1
    df.select(
        '*',
        udf1('integers').alias('square1')
    ).show()
    def square(x):return x**2
    # 5-定义方式2-udf(lambda 有名函数,返回数据类型) 整数->平方
    udf2=udf( lambda x:square(x) , IntegerType() )
    # 6-使用udf2
    df.select(
        '*',
        udf2('integers').alias('square2')
    ).show()

    # 7 定义方式3-udf3
    udf3=udf(square,IntegerType())
    # 8-使用udf3
    df.select(
        '*',
        udf3('integers').alias('square3')
    ).show()
    # 9 ,验证1-如果实际的结果,不是定义的返回值类型IntegerType(),则显示为null
    df.select(
        '*',
        udf1('integers').alias('square_integer'),
        udf1('floats').alias('square_float')
    ).show()
    # 10 ,验证2-如果实际的结果,不是定义的-FloatType(),则显示为null
    udf1_1 = udf(lambda x: x ** 2, FloatType())
    df.select(
        '*',
        udf1_1('integers').alias('square_integer'),
        udf1_1('floats').alias('square_float')
    ).show()
    # 11-定义udf4,返回值类型是数组类型
    udf4=udf( lambda arr: [x**2 for x in arr] , ArrayType( IntegerType() ))
    df.select(
        'integers',
        'floats',
        'integer_arrays',
        udf4('integer_arrays').alias('arr2')
    ).show()

    # 12-udf的返回值类型是Tuple或混合输出类型
    # 如下:有一个函数,输入一个数字,返回数字以及该数字对应字母表中的字母。
    # 13-定义udf5,返回类型用自定义的schema
    schema=StructType().add('num',IntegerType()).add('letter',StringType())
    udf5=udf( lambda x:(x,string.ascii_letters[x]), schema )
    # 14-使用udf5
    df.select(
        '*',
        udf5('integers').alias('tup')
    ).show()

3.2 案例二:

from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType, ArrayType, StructType, StringType

if __name__ == '__main__':
    # 1-创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions',  '6').getOrCreate()
    df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    #模拟数据集实现用户名字长度,用户名字转为大写,以及age年龄字段增加1岁
    #2 -定义函数实现-用户名字长度
    name_len_udf=udf(lambda name:len(name),IntegerType())
    #3 -@装饰器方式定义函数实现-用户名字转为大写
    @udf
    def to_upper(name):
        return name.upper()

    #4 -@装饰器方式定义函数实现-age年龄字段增加1岁
    @udf(returnType=IntegerType())
    def add_one(age):return age+1

    #5用DSL风格计算
    print('用DSL风格计算')
    df.select(
        '*',
        name_len_udf('name').alias('name_len'),
        to_upper('name').alias('upper'),
        add_one('age').alias('new_age')
    ).show()

    #5用SQL风格计算
    #将python版的udf,注册成SQL版的函数
    spark.udf.register( 'name_len_udf1' , name_len_udf)
    spark.udf.register( 'to_upper1' , to_upper)
    spark.udf.register( 'add_one1' , add_one)
    print('用SQL风格计算')
    df.createOrReplaceTempView('people')
    spark.sql('''
    select  *,
            name_len_udf1 (name) as name_len,
            to_upper1(name) as upper,
            add_one1(age) as new_age
      from people
    ''').show()

版权声明:本文为qq_17685725原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。