1、自定义函数介绍:
1.1UDF 函数(User-Defined-Function)
- 一对一的关系,输入一个值经过函数以后输出一个值;
- 在 Hive 中继承 UDF 类,方法名称为 evaluate,返回值不能为 void,其实就是实现一个方法;
select a,b, my_udf(a,b) as c from table1
1.2UDAF 聚合函数(User-Defined Aggregation Function)
- 多对一的关系,输入多个值输出一个值,通常与 groupBy 联合使用;
select a, b, my_udaf(c) as avg_c from table1 group by a,b
1.3UDTF 函数(User-Defined Table-Generating Functions)
- 一对多的关系,输入一个值输出多个值(一行变为多行);
- 用户自定义生成函数,有点像 flatMap;
-- select explode(array) col_1 from table1 select my_udtf(array) col_1 from table1
2、SparkSQL中实现UDF函数
2.1定义UDF:
- 先导包:
from pyspark.sql.functions import *
- 语法1:udf1 = udf( lambda匿名函数,返回数据类型)
udf1 = udf(lambda x:x+1, IntegerType())
- 语法2:udf2 = udf( lambda有名函数,返回数据类型)
def fun(x):return x + 1 udf1 = udf(lambda x:fun(x), IntegerType())
- 语法3:udf3 = udf( python普通函数,返回数据类型)
def fun(x):return x+1 udf1 = udf(fun,IntegerType())
- 返回数据类型可以是:IntegerType()、FloatType()、ArrayType(),如果是元组复杂类型,还需自定义 schema
- 语法4:使用 @udf 装饰器装饰 python 函数:
@udf def udf1(x): if x < 10: return x + 1
2.2使用UDF:
- DSL风格
df.select(udf名称(字段).alias(别名)).show()
SQL风格
- 第1步:注册临时视图
df.createOrReplaceTempView('view1')
- 第2步:定义UDF函数,并注册
spark.udf.register('注册后的SQL版UDF函数名', 注册前的Python版UDF函数名)
- 第3步:在SQL语句中使用UDF函数
spark.sql('使用注册后的SQL版UDF函数名写SQL语句').show()
3、综合案例
- 请提前安装pyarrow,即
- pip install pyarrow或者
- pip install pyspark[sql]
- pip install pyarrow-4.0.1-cp38-cp38-manylinux2014_x86_64.whl
- 否则可能会有问题。
3.1 案例一:
import string
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType, ArrayType, StructType, StringType
if __name__ == '__main__':
# 1-创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate()
# 2-创建数据集
pdf = pd.DataFrame(data={'integers': [1, 2, 3],
'floats': [-1.0, 0.6, 2.6],
'integer_arrays': [[1, 2], [3, 4], [5, 6, 8, 9]]})
df = spark.createDataFrame(pdf)
# 3-定义方式1- udf(lambda 匿名函数,返回数据类型) 整数->平方
udf1= udf(lambda x:x**2 , IntegerType() )
# 4-使用udf1
df.select(
'*',
udf1('integers').alias('square1')
).show()
def square(x):return x**2
# 5-定义方式2-udf(lambda 有名函数,返回数据类型) 整数->平方
udf2=udf( lambda x:square(x) , IntegerType() )
# 6-使用udf2
df.select(
'*',
udf2('integers').alias('square2')
).show()
# 7 定义方式3-udf3
udf3=udf(square,IntegerType())
# 8-使用udf3
df.select(
'*',
udf3('integers').alias('square3')
).show()
# 9 ,验证1-如果实际的结果,不是定义的返回值类型IntegerType(),则显示为null
df.select(
'*',
udf1('integers').alias('square_integer'),
udf1('floats').alias('square_float')
).show()
# 10 ,验证2-如果实际的结果,不是定义的-FloatType(),则显示为null
udf1_1 = udf(lambda x: x ** 2, FloatType())
df.select(
'*',
udf1_1('integers').alias('square_integer'),
udf1_1('floats').alias('square_float')
).show()
# 11-定义udf4,返回值类型是数组类型
udf4=udf( lambda arr: [x**2 for x in arr] , ArrayType( IntegerType() ))
df.select(
'integers',
'floats',
'integer_arrays',
udf4('integer_arrays').alias('arr2')
).show()
# 12-udf的返回值类型是Tuple或混合输出类型
# 如下:有一个函数,输入一个数字,返回数字以及该数字对应字母表中的字母。
# 13-定义udf5,返回类型用自定义的schema
schema=StructType().add('num',IntegerType()).add('letter',StringType())
udf5=udf( lambda x:(x,string.ascii_letters[x]), schema )
# 14-使用udf5
df.select(
'*',
udf5('integers').alias('tup')
).show()
3.2 案例二:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType, ArrayType, StructType, StringType
if __name__ == '__main__':
# 1-创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions', '6').getOrCreate()
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
#模拟数据集实现用户名字长度,用户名字转为大写,以及age年龄字段增加1岁
#2 -定义函数实现-用户名字长度
name_len_udf=udf(lambda name:len(name),IntegerType())
#3 -@装饰器方式定义函数实现-用户名字转为大写
@udf
def to_upper(name):
return name.upper()
#4 -@装饰器方式定义函数实现-age年龄字段增加1岁
@udf(returnType=IntegerType())
def add_one(age):return age+1
#5用DSL风格计算
print('用DSL风格计算')
df.select(
'*',
name_len_udf('name').alias('name_len'),
to_upper('name').alias('upper'),
add_one('age').alias('new_age')
).show()
#5用SQL风格计算
#将python版的udf,注册成SQL版的函数
spark.udf.register( 'name_len_udf1' , name_len_udf)
spark.udf.register( 'to_upper1' , to_upper)
spark.udf.register( 'add_one1' , add_one)
print('用SQL风格计算')
df.createOrReplaceTempView('people')
spark.sql('''
select *,
name_len_udf1 (name) as name_len,
to_upper1(name) as upper,
add_one1(age) as new_age
from people
''').show()
版权声明:本文为qq_17685725原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。