Hive自定义函数(UDF/UDAF/UDTF)
写在前面的话
Hive 内置了许多的函数可供使用,这些内置函数可以满足大部分的场景需求,但对于特殊场景的处理,需要通过自定义函数去实现;本文将讨论hive自定义函数相关的内容,如有错误欢迎指正;一般来讲,我们会将hive的自定义函数分为三类:UDF\UDAF\UDTF,下面我们分别讨论这三种函数的 逻辑和实现
UDF
User-Defined Functions 用户定义函数
一进一出,只对单行数据产生作用,UDF函数是三种自定义函数中最简单的一种,也是使用最广泛的一种。
1.UDF 函数的实现
首先需要添加依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
1.1代码实现
1.1.1函数类继承UDF类
package org.hive_test.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public class MyUDF extends UDF {
//evaluate 方法可以重载
public String evaluate(String str){
return str.toLowerCase();//将大写字母转换成小写
}
public String evaluate(String a,String b) {
return a.toLowerCase().concat(b.toLowerCase());
//两个字符串转化成小写并进行拼接
}
}
1.1.2 继承GenericUDF类
hive提供的GenericUDF是一个更复杂的抽象类,可以支持标准的UDF无法实现的一些操作,对应于hive内置函数中一个例子:CASE...WHEN语句,函数会根据传入参数的不同而触发不同的操作。当然这种操作使用UDF也可以实现,但是当传入的参数类型众多时需要使用很多的evluate方法重载,示例方法实现了返回两个参数中第一个非null值
`
package org.hive_test.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
public class MyGenericUDF extends GenericUDF {
private ObjectInspector[] argumentIOs;
//ReturnObjectInspectorResolver 类对象通过获取非null的值变量的类型来确定返回值的类型
private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
/**
* initialize 方法会被输入的每个参数调用,并最终传入ObjectInspector对象中,
* 每次调用会生成一个GenericUDF实例,在该实例中initialize 方法将调用一次且仅调用一次,
*
* @param arguments
* @return
* @throws UDFArgumentException
*/
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
argumentIOs = arguments;
if (argumentIOs.length != 2) {
throw new UDFArgumentLengthException("The operator 'MyGenericUDF' accepts 2 arguments!");
}
returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
if (!(returnOIResolver.update(arguments[0])&&returnOIResolver.update(arguments[1]))){
throw new UDFArgumentTypeException(2,"The 1st and 2nd args of function should have the same type," +
"but they are different:\""+arguments[0].getTypeName()+"\" and \" "+
arguments[1].getTypeName()+"\"");
}
return returnOIResolver.get();
}
/**
* evaluate方法接收一个 DeferredObject 对象数组,
* 而initialize方法中创建的returnOIResolver 对象就用于从DeferredObject对象中获取值
*/
public Object evaluate(DeferredObject[] arguments) throws HiveException {
Object retVal=returnOIResolver.convertIfNecessary(arguments[0].get(),argumentIOs[0]);
if (retVal==null){
retVal=returnOIResolver.convertIfNecessary(arguments[1].get(),argumentIOs[1]);
}
return retVal;
}
/**
* 用于Hadoop task内部,在使用到该函数来展示调试信息
* @param children
* @return
*/
public String getDisplayString(String[] children) {
StringBuilder sb=new StringBuilder();
sb.append("if ");
sb.append(children[0]);
sb.append(" is null ");
sb.append("return");
sb.append( children[1]);
return sb.toString();
}
}
1.2.打jar包,上传
- 上传到 $HIVE_HOME/lib/目录以下
- 若没有上传到上述目录以下,则需要使用命令 :
hive> add jar +jar包所在的目录/jar包名字
1.3.创建函数
- 临时函数(只对当前窗口有效)
--注:以下执行的操作,为hive sql 执行
CREATE TEMPORARY FUNCTION function_name AS class_name;
--function_name:函数名;
--class_name:UDF类的全类名
--例:我们对上面UDF的实现进行创建函数
add jar /..本地路径/HadoopTest-1.0-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION str_tool AS package org.hive_test.udf.MyUDF;
--这样我们就注册创建了一个临时函数
- 永久函数(可以将jar包上传到hdfs中)
CREATE FUNCTION function_name AS class_name USING JAR path;
--function_name:函数名;
--class_name:函数类的全类名
--path:jar包所在的路径
#例:还是对上面UDF的实现进行创建函数
#上传hdfs
hdfs dfs -mkdir /hive_resource
hdfs dfs -put /..本地路径/HadoopTest-1.0-SNAPSHOT.jar /hive_resource
#hive sql 执行
CREATE FUNCTION str_tool AS "org.hive_test.udf.MyUDF" USING JAR "hdfs://ip地址:9000/hive_resource/HadoopTest-1.0-SNAPSHOT.jar" ;
#这样我们就注册创建了一个可以永久使用的函数
2.UDF 函数的使用
udf 函数的使用与hive一般的内置函数类似;
示例:有这样一张student表
| id | name | age |
|---|---|---|
| 1 | 熊大 | 20 |
| 2 | 熊二 | 19 |
| 3 | 张三 | 20 |
| 4 | 李四 | 18 |
select str_tool(name,"Class01") as name,age from student where id=4;
--结果如下:
----------------------
--|name |age|
--|李四class01 |18 |
----------------------
UDAF
User-Defined-Aggregation Function 用户自定义聚合函数
可对多行数据产生作用,产出单一结果,即多进一出。例如内置函数中的sum(),avg(),count()等,都属于多进一出聚合函数。
UDAF的实现是比较复杂的,聚合函数会分为多个阶段进行处理,聚合过程是在mapTask或者reduceTask中执行的,是一个有内存限制的Java进程,所以在聚合操作过程中,对于存储较大的结构化数据可能产生内存溢出异常,在编写UDAF时,一定要注意内存占用问题。
hive 官网提供了一个自定义聚合函数的例子,GenericUDAFHistogramNumeric 类,全类名:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric.java
1.UDAF执行逻辑
UDAF 主实现类 需要继承 AbstractGenericUDAFResolver 抽象类,重写getEvaluator 方法,该方法会返回GenericUDAFEvaluator 类的子类对象,GenericUDAFEvaluator 是一个抽象类包含了实现 UDAF函数处理逻辑的方法;
在实现自定义UDAF的过程中必须通过继承GenericUDAFEvaluator 去实现自定义的执行逻辑;
GenericUDAFEvaluator 类包含一个静态内部枚举类:
public static enum Mode {
/**
* 从原始数据到部分聚合数据:
* iterate() 和 terminatePartial()将会被调用
*/
PARTIAL1,
/**
* 的map端的Combiner阶段,负责在map端合并map的数据:
* merge() 和 terminatePartial() 将会被调用.
*/
PARTIAL2,
/**
* mapreduce的reduce阶段:从部分数据的聚合到完全聚合:
* merge() 和 terminate() 将会调用.
*/
FINAL,
/**
* 原始数据直接到完全聚合: iterate() 和 terminate() 将会被调用.
*/
COMPLETE
};
hive udaf 的执行过程本质上是MR的过程,Mode枚举类列出了udaf函数执行过程中对应MR的不同阶段:
事实上一个完整的UDAF程序 是需要包含上述三个阶段,但是,在一些简单场景的使用中,并不需要从原始数据到部分数据的聚合,因此,只需要经历第四个mode就足够了 :
在继承 GenericUDAFEvaluator 类时必须复写以下方法:
/**
* init 方法 在类实例化时调用,该方法会实例化一个 UDAF evaluator 类
* 参数 Mode :聚合过程的mode
*参数 ObjectInspector[]: 在PARTIAL1和COMPLETE模式下,参数为原始数据;PARTIAL2和FINAL模式下,参数只是部分聚合(在这种情况下,数组将始终具有单个元素)
*/
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException{...}
/**
*返回一个用于存储中间聚合结果的对象
*/
public AggregationBuffer getNewAggregationBuffer() throws HiveException{...}
/**
* Reset聚合过程. 如果需要重新开始聚合的逻辑可以使用该方法
*/
public void reset(AggregationBuffer agg) throws HiveException {...}
/**
* 迭代源数据,每次迭代会将一组新的数据载入到聚合的buffer中
*/
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {...}
/**
*返回部分聚合的结果,返回的方式是一种可持久化的方式,返回值中只可以使用:
*1.java 基本数据类型和Array ,以及基本封装类型(如Double、Integer等)
*2.Hadoop 中的Writable 类
*3.List和Map类
*不能使用用户自定义的类(即使实现了java.io.Seralizable)
*/
public Object terminatePartial(AggregationBuffer agg) throws HiveException {...}
/**
* 将terminatePartial 返回的中间聚合结果合并到当前聚合中
* 如果没有输入数据,可能会传递null
*/
public void merge(AggregationBuffer agg, Object partial) throws HiveException {...}
/**
* 返回最终的聚合结果
*/
public Object terminate(AggregationBuffer agg) throws HiveException {...}
2.UDAF实现
这里实现一个 jsonUdaf函数,函数接收两个string 参数分别作为key和value,结果返回一个json字符串:
package org.hive_test.udaf;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class AggJson extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
ObjectInspector[] parameterObjectInspectors = info.getParameterObjectInspectors();
//检查参数个数
if (parameterObjectInspectors.length != 2) {
throw new UDFArgumentLengthException("Exactly two argument is expected.");
}
//检查参数类型是否为String
if (parameterObjectInspectors[0].getCategory() != ObjectInspector.Category.PRIMITIVE &&
parameterObjectInspectors[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("传入的参数类型必须是基本数据类型!");
}
if( (((PrimitiveObjectInspector) parameterObjectInspectors[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) &&
(((PrimitiveObjectInspector) parameterObjectInspectors[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING)){
throw new UDFArgumentException("传入的参数类型必须是String类型!");
}
return new Aggregator();
}
public static class JsonMapBuffer extends AbstractAggregationBuffer {
Map<String, Object> mapJson=new HashMap<String, Object>();
public void merge(JSONObject other) {
if (other != null && other.size() > 0) {
Set<String> keySet = other.keySet();
for(String key:keySet) {
this.mapJson.put(key,other.getString(key));
}
}
}
}
public static class Aggregator extends GenericUDAFEvaluator {
private JavaStringObjectInspector outputIo;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
assert (parameters.length == 2);
super.init(m, parameters);
outputIo=PrimitiveObjectInspectorFactory.javaStringObjectInspector;
return outputIo;
}
@Override
public AbstractAggregationBuffer getNewAggregationBuffer() throws HiveException {
JsonMapBuffer jsonMapBuffer = new JsonMapBuffer();
reset(jsonMapBuffer);
return jsonMapBuffer;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((JsonMapBuffer)agg).mapJson=new HashMap<String, Object>(16);
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
JsonMapBuffer buf = (JsonMapBuffer)agg;
if (parameters[0] == NullWritable.get() || parameters[1] == NullWritable.get() || parameters[0] == null || parameters[1] == null){
return;
}
String key = outputIo.getPrimitiveJavaObject(parameters[0]);
Object value = outputIo.getPrimitiveJavaObject(parameters[1]);
buf.mapJson.put(key, value);
}
@Override
public String terminatePartial(AggregationBuffer agg) throws HiveException {
JsonMapBuffer buf = (JsonMapBuffer)agg;
try {
JSONObject jsonObj = new JSONObject(buf.mapJson);
return outputIo.getPrimitiveJavaObject(jsonObj.toJSONString());
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
JsonMapBuffer buf = (JsonMapBuffer)agg;
JSONObject jsonObject = JSONObject.parseObject(((Text) partial).toString());
buf.merge(jsonObject);
}
@Override
public String terminate(AggregationBuffer agg) throws HiveException {
JsonMapBuffer buf = (JsonMapBuffer)agg;
try {
JSONObject jsonObj = new JSONObject(buf.mapJson);
return outputIo.getPrimitiveJavaObject(jsonObj.toJSONString());
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
}
打包上传 hdfs 创建函数
hive> create function json_udaf as "org.hive_test.udaf.AggJson" using jar "hdfs:///user/hive/warehouse/JsonUDAFTest-4.0-SNAPSHOT.jar";
Added [/tmp/6c7ab97b-170d-4670-a34d-8a9c59fccb1a_resources/JsonUDAFTest-4.0-SNAPSHOT.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/JsonUDAFTest-4.0-SNAPSHOT.jar]
OK
Time taken: 0.8 seconds
运行函数得到结果:
hive> select dws_360100.json_udaf(tfcdline_id,cast(avg_eff_index as string)) from udftest;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = root_20211115101110_f2c06175-2a48-4727-994c-2fc23c85d798
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1618373084154_384206, Tracking URL = http://emr-header-1.cluster-12904:20888/proxy/application_1618373084154_384206/
Kill Command = /usr/lib/hadoop-current/bin/hadoop job -kill job_1618373084154_384206
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2021-11-15 10:11:20,472 Stage-1 map = 0%, reduce = 0%
2021-11-15 10:11:26,741 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.07 sec
2021-11-15 10:11:38,153 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 3.37 sec
MapReduce Total cumulative CPU time: 3 seconds 370 msec
Ended Job = job_1618373084154_384206
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 3.37 sec HDFS Read: 10430 HDFS Write: 239 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 370 msec
OK
{"0035fcc9411277ad3da3f85cc91461db":"68.5452","045719e30c5db330ddd8599723ac5dbd":"59.39376","000b133f9c75cbe2c9c4ab4bd4e754a9":"68.23347"}
Time taken: 28.611 seconds, Fetched: 1 row(s)
UDTF
User-Defined Table-Generating Functions 用户自定义表生成函数 一行输入多行输出,这里的一行输入,并非单指数据表中的行,或者可以称之为某个输入,例如hive 中内置的explode方法,该方法接收一个数组,并将数组中的每个元素作为一行输出;
1.UDTF执行逻辑
看官方文档描述:
可以通过继承GenericUDTF 抽象类来创建一个 Udtf,并且需要实现initialize、process 方法, 可能还需要实现close方法。
udtf 实例会首先调用initialize 方法,来指定UDTF函数接受的参数类型和个数,这个方法需要返回一个 与UDTF 生成的行对象相对应的对象检查器。
initialize 之后 process 方法会被调用,用来接收输入的行,在process 内部,通过调用forward 方法生成结果行,并将行 转发给其他运算符,最后当所有行都执行完成 ,调用 close方法。
2.UDTF实现
这里我们实现一个StrNumSeq 函数,它的作用如下,函数接收两个参数,第一个参数值作为value,第二个参数作为 当前value出现的次数(从0 开始计算):
即输入:
value,n
返回 为 :
value,0
value,1
value,2
…
value,n-1
=========实现如下:
package org.hive_test.udtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
public class StrNumSeq extends GenericUDTF {
@Override
public void process(Object[] args) throws HiveException {
String flag = args[0].toString();
long value = Long.parseLong(args[1].toString());
for (long i = 0L; i < value; i++) {
try {
String[] resRow = {flag, String.valueOf(i)};
forward(resRow);
}catch (Exception e){
e.printStackTrace();
}
}
}
@Override
public void close() throws HiveException {
}
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
List<? extends StructField> allStructFieldRefs = argOIs.getAllStructFieldRefs();
int size = allStructFieldRefs.size();
if (size != 2) {
throw new UDFArgumentLengthException("Exactly two argument is expected.");
}
ArrayList<String> fieldNames = new ArrayList<String>(size);
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(size);
fieldNames.add("value");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("index");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
}
表数据准备:
hive> load data local inpath "/home/admin/data/udtftest.txt" into table udtftest;
Loading data to table dws_360100.udtftest
OK
Time taken: 1.224 seconds
hive> select * from udtftest;
OK
lisi 4
wangwu 6
menfang 8
shijian 9
Time taken: 1.656 seconds, Fetched: 4 row(s)
hive>
打包和注册函数的过程与上面相同的这里省略,执行结果如下: