Hive自定义函数

写在前面的话

Hive 内置了许多的函数可供使用,这些内置函数可以满足大部分的场景需求,但对于特殊场景的处理,需要通过自定义函数去实现;本文将讨论hive自定义函数相关的内容,如有错误欢迎指正;一般来讲,我们会将hive的自定义函数分为三类:UDF\UDAF\UDTF,下面我们分别讨论这三种函数的 逻辑和实现

UDF

User-Defined Functions 用户定义函数
一进一出,只对单行数据产生作用,UDF函数是三种自定义函数中最简单的一种,也是使用最广泛的一种。

1.UDF 函数的实现

首先需要添加依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.0</version>
</dependency>
<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>1.2.1</version>
</dependency>
1.1代码实现
1.1.1函数类继承UDF类
package org.hive_test.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class MyUDF extends UDF {

    //evaluate 方法可以重载
    public String evaluate(String str){
        return str.toLowerCase();//将大写字母转换成小写
    }
    public String evaluate(String a,String b) {
        return a.toLowerCase().concat(b.toLowerCase());
        //两个字符串转化成小写并进行拼接
    }

}
1.1.2 继承GenericUDF类

hive提供的GenericUDF是一个更复杂的抽象类,可以支持标准的UDF无法实现的一些操作,对应于hive内置函数中一个例子:CASE...WHEN语句,函数会根据传入参数的不同而触发不同的操作。当然这种操作使用UDF也可以实现,但是当传入的参数类型众多时需要使用很多的evluate方法重载,示例方法实现了返回两个参数中第一个非null值

`

package org.hive_test.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

public class MyGenericUDF extends GenericUDF {
    private ObjectInspector[] argumentIOs;
    //ReturnObjectInspectorResolver 类对象通过获取非null的值变量的类型来确定返回值的类型
    private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;

    /**
     * initialize 方法会被输入的每个参数调用,并最终传入ObjectInspector对象中,
     * 每次调用会生成一个GenericUDF实例,在该实例中initialize 方法将调用一次且仅调用一次,
     *
     * @param arguments
     * @return
     * @throws UDFArgumentException
     */
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        argumentIOs = arguments;
        if (argumentIOs.length != 2) {
            throw new UDFArgumentLengthException("The operator 'MyGenericUDF' accepts 2  arguments!");
        }
        returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
        if (!(returnOIResolver.update(arguments[0])&&returnOIResolver.update(arguments[1]))){
            throw  new UDFArgumentTypeException(2,"The 1st and 2nd args of function should have the same type," +
                    "but they are different:\""+arguments[0].getTypeName()+"\" and \" "+
                    arguments[1].getTypeName()+"\"");
        }
        return returnOIResolver.get();
    }
    /**
     * evaluate方法接收一个 DeferredObject 对象数组,
     * 而initialize方法中创建的returnOIResolver 对象就用于从DeferredObject对象中获取值
     */
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        Object retVal=returnOIResolver.convertIfNecessary(arguments[0].get(),argumentIOs[0]);
        if (retVal==null){
            retVal=returnOIResolver.convertIfNecessary(arguments[1].get(),argumentIOs[1]);
        }
        return retVal;
    }

    /**
     * 用于Hadoop task内部,在使用到该函数来展示调试信息
     * @param children
     * @return
     */
    public String getDisplayString(String[] children) {
        StringBuilder sb=new StringBuilder();
        sb.append("if ");
        sb.append(children[0]);
        sb.append(" is null ");
        sb.append("return");
        sb.append( children[1]);
        return sb.toString();
    }
}


1.2.打jar包,上传
  • 上传到 $HIVE_HOME/lib/目录以下
  • 若没有上传到上述目录以下,则需要使用命令 :
hive> add jar +jar包所在的目录/jar包名字
1.3.创建函数
  • 临时函数(只对当前窗口有效)
--注:以下执行的操作,为hive sql 执行
CREATE TEMPORARY FUNCTION function_name AS class_name;   
--function_name:函数名;
--class_name:UDF类的全类名

--例:我们对上面UDF的实现进行创建函数
add jar /..本地路径/HadoopTest-1.0-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION str_tool AS package org.hive_test.udf.MyUDF; 
--这样我们就注册创建了一个临时函数
  • 永久函数(可以将jar包上传到hdfs中)
CREATE  FUNCTION function_name AS class_name USING JAR path;
--function_name:函数名;
--class_name:函数类的全类名
--path:jar包所在的路径
#例:还是对上面UDF的实现进行创建函数
#上传hdfs
hdfs dfs -mkdir /hive_resource
hdfs dfs -put /..本地路径/HadoopTest-1.0-SNAPSHOT.jar /hive_resource
#hive sql 执行
CREATE FUNCTION str_tool AS "org.hive_test.udf.MyUDF" USING JAR "hdfs://ip地址:9000/hive_resource/HadoopTest-1.0-SNAPSHOT.jar" ; 
#这样我们就注册创建了一个可以永久使用的函数

2.UDF 函数的使用

udf 函数的使用与hive一般的内置函数类似;

示例:有这样一张student表

idnameage
1熊大20
2熊二19
3张三20
4李四18
select str_tool(name,"Class01") as name,age  from student where id=4;
--结果如下:
----------------------
--|name			|age|
--|李四class01	|18	|
----------------------

UDAF

User-Defined-Aggregation Function 用户自定义聚合函数
可对多行数据产生作用,产出单一结果,即多进一出。例如内置函数中的sum(),avg(),count()等,都属于多进一出聚合函数。
UDAF的实现是比较复杂的,聚合函数会分为多个阶段进行处理,聚合过程是在mapTask或者reduceTask中执行的,是一个有内存限制的Java进程,所以在聚合操作过程中,对于存储较大的结构化数据可能产生内存溢出异常,在编写UDAF时,一定要注意内存占用问题。
hive 官网提供了一个自定义聚合函数的例子,GenericUDAFHistogramNumeric 类,全类名:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric.java

1.UDAF执行逻辑

UDAF 主实现类 需要继承 AbstractGenericUDAFResolver 抽象类,重写getEvaluator 方法,该方法会返回GenericUDAFEvaluator 类的子类对象,GenericUDAFEvaluator 是一个抽象类包含了实现 UDAF函数处理逻辑的方法;
在实现自定义UDAF的过程中必须通过继承GenericUDAFEvaluator 去实现自定义的执行逻辑;

GenericUDAFEvaluator 类包含一个静态内部枚举类:

 public static enum Mode {
    /**
     * 从原始数据到部分聚合数据: 
     * iterate() 和 terminatePartial()将会被调用
     */
    PARTIAL1,
    /**
     * 的map端的Combiner阶段,负责在map端合并map的数据:
     * merge() 和 terminatePartial() 将会被调用.
     */
    PARTIAL2,
    /**
     * mapreduce的reduce阶段:从部分数据的聚合到完全聚合: 
     * merge() 和 terminate() 将会调用.
     */
    FINAL,
    /**
     * 原始数据直接到完全聚合: iterate() 和 terminate() 将会被调用.
     */
    COMPLETE
  };

hive udaf 的执行过程本质上是MR的过程,Mode枚举类列出了udaf函数执行过程中对应MR的不同阶段:

PARTIAL1
mapper
部分聚合, iterate 和 terminatePartial方法调用
PARTIAL2
combine
部分聚合到部分聚合, 调用merge和terminatePartial
FINAL
reducer
部分聚合到完全聚合, 调用merge和terminate

事实上一个完整的UDAF程序 是需要包含上述三个阶段,但是,在一些简单场景的使用中,并不需要从原始数据到部分数据的聚合,因此,只需要经历第四个mode就足够了 :

COMPLETE
mapper
原始数据直接到完全聚合: iterate和 terminate将会被调用

在继承 GenericUDAFEvaluator 类时必须复写以下方法:

/**
* init 方法 在类实例化时调用,该方法会实例化一个 UDAF evaluator 类 
* 参数 Mode :聚合过程的mode
*参数 ObjectInspector[]: 在PARTIAL1和COMPLETE模式下,参数为原始数据;PARTIAL2和FINAL模式下,参数只是部分聚合(在这种情况下,数组将始终具有单个元素)
*/
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException{...}

/**
*返回一个用于存储中间聚合结果的对象
*/
public AggregationBuffer getNewAggregationBuffer() throws HiveException{...}

/**
* Reset聚合过程. 如果需要重新开始聚合的逻辑可以使用该方法
*/
public void reset(AggregationBuffer agg) throws HiveException {...}

/**
* 迭代源数据,每次迭代会将一组新的数据载入到聚合的buffer中
*/
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {...}

/**
*返回部分聚合的结果,返回的方式是一种可持久化的方式,返回值中只可以使用:
*1.java 基本数据类型和Array ,以及基本封装类型(如Double、Integer等)
*2.Hadoop 中的Writable 类 
*3.List和Map类
*不能使用用户自定义的类(即使实现了java.io.Seralizable)
*/
public Object terminatePartial(AggregationBuffer agg) throws HiveException {...}

/**
* 将terminatePartial 返回的中间聚合结果合并到当前聚合中
* 如果没有输入数据,可能会传递null
*/
public void merge(AggregationBuffer agg, Object partial) throws HiveException {...}

/**
* 返回最终的聚合结果
*/
public Object terminate(AggregationBuffer agg) throws HiveException {...}

2.UDAF实现

这里实现一个 jsonUdaf函数,函数接收两个string 参数分别作为key和value,结果返回一个json字符串:

package org.hive_test.udaf;

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class AggJson extends AbstractGenericUDAFResolver {
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        ObjectInspector[] parameterObjectInspectors = info.getParameterObjectInspectors();
        //检查参数个数
        if (parameterObjectInspectors.length != 2) {
            throw new UDFArgumentLengthException("Exactly two argument is expected.");
        }
        //检查参数类型是否为String
        if (parameterObjectInspectors[0].getCategory() != ObjectInspector.Category.PRIMITIVE &&
                parameterObjectInspectors[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("传入的参数类型必须是基本数据类型!");
        }
        if( (((PrimitiveObjectInspector) parameterObjectInspectors[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) &&
                (((PrimitiveObjectInspector) parameterObjectInspectors[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING)){
            throw new UDFArgumentException("传入的参数类型必须是String类型!");
        }
        return new Aggregator();
    }

      public  static class JsonMapBuffer extends AbstractAggregationBuffer {
        Map<String, Object> mapJson=new HashMap<String, Object>();
        public void merge(JSONObject other) {
            if (other != null && other.size() > 0) {
                Set<String> keySet = other.keySet();
                for(String key:keySet) {
                    this.mapJson.put(key,other.getString(key));
                }
            }
        }
    }

   public static class Aggregator extends GenericUDAFEvaluator {

       private JavaStringObjectInspector outputIo;

       @Override
       public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
           assert (parameters.length == 2);
            super.init(m, parameters);
           outputIo=PrimitiveObjectInspectorFactory.javaStringObjectInspector;
            return outputIo;
         }

        @Override
        public AbstractAggregationBuffer getNewAggregationBuffer() throws HiveException {
            JsonMapBuffer jsonMapBuffer = new JsonMapBuffer();
            reset(jsonMapBuffer);
            return jsonMapBuffer;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((JsonMapBuffer)agg).mapJson=new HashMap<String, Object>(16);
        }

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            JsonMapBuffer buf = (JsonMapBuffer)agg;
            if (parameters[0] == NullWritable.get() || parameters[1] == NullWritable.get() || parameters[0] == null || parameters[1] == null){
                return;
            }
            String key = outputIo.getPrimitiveJavaObject(parameters[0]);
            Object value = outputIo.getPrimitiveJavaObject(parameters[1]);
            buf.mapJson.put(key, value);
        }

        @Override
        public String terminatePartial(AggregationBuffer agg) throws HiveException {
            JsonMapBuffer buf = (JsonMapBuffer)agg;
            try {
                JSONObject jsonObj = new JSONObject(buf.mapJson);
                return outputIo.getPrimitiveJavaObject(jsonObj.toJSONString());
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            JsonMapBuffer buf = (JsonMapBuffer)agg;
            JSONObject jsonObject = JSONObject.parseObject(((Text) partial).toString());
            buf.merge(jsonObject);
        }

        @Override
        public String terminate(AggregationBuffer agg) throws HiveException {
            JsonMapBuffer buf = (JsonMapBuffer)agg;
            try {
                JSONObject jsonObj = new JSONObject(buf.mapJson);
                return outputIo.getPrimitiveJavaObject(jsonObj.toJSONString());
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
    }
}

打包上传 hdfs 创建函数

hive> create function json_udaf as "org.hive_test.udaf.AggJson" using jar "hdfs:///user/hive/warehouse/JsonUDAFTest-4.0-SNAPSHOT.jar";
Added [/tmp/6c7ab97b-170d-4670-a34d-8a9c59fccb1a_resources/JsonUDAFTest-4.0-SNAPSHOT.jar] to class path
Added resources: [hdfs:///user/hive/warehouse/JsonUDAFTest-4.0-SNAPSHOT.jar]
OK
Time taken: 0.8 seconds

运行函数得到结果:

hive> select dws_360100.json_udaf(tfcdline_id,cast(avg_eff_index as string)) from udftest;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = root_20211115101110_f2c06175-2a48-4727-994c-2fc23c85d798
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1618373084154_384206, Tracking URL = http://emr-header-1.cluster-12904:20888/proxy/application_1618373084154_384206/
Kill Command = /usr/lib/hadoop-current/bin/hadoop job  -kill job_1618373084154_384206
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2021-11-15 10:11:20,472 Stage-1 map = 0%,  reduce = 0%
2021-11-15 10:11:26,741 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.07 sec
2021-11-15 10:11:38,153 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.37 sec
MapReduce Total cumulative CPU time: 3 seconds 370 msec
Ended Job = job_1618373084154_384206
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 3.37 sec   HDFS Read: 10430 HDFS Write: 239 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 370 msec
OK
{"0035fcc9411277ad3da3f85cc91461db":"68.5452","045719e30c5db330ddd8599723ac5dbd":"59.39376","000b133f9c75cbe2c9c4ab4bd4e754a9":"68.23347"}
Time taken: 28.611 seconds, Fetched: 1 row(s)

UDTF

User-Defined Table-Generating Functions 用户自定义表生成函数 一行输入多行输出,这里的一行输入,并非单指数据表中的行,或者可以称之为某个输入,例如hive 中内置的explode方法,该方法接收一个数组,并将数组中的每个元素作为一行输出;

1.UDTF执行逻辑

看官方文档描述:
在这里插入图片描述
可以通过继承GenericUDTF 抽象类来创建一个 Udtf,并且需要实现initialize、process 方法, 可能还需要实现close方法。
udtf 实例会首先调用initialize 方法,来指定UDTF函数接受的参数类型和个数,这个方法需要返回一个 与UDTF 生成的行对象相对应的对象检查器。
initialize 之后 process 方法会被调用,用来接收输入的行,在process 内部,通过调用forward 方法生成结果行,并将行 转发给其他运算符,最后当所有行都执行完成 ,调用 close方法。

2.UDTF实现

这里我们实现一个StrNumSeq 函数,它的作用如下,函数接收两个参数,第一个参数值作为value,第二个参数作为 当前value出现的次数(从0 开始计算):
即输入:
value,n

返回 为 :
value,0
value,1
value,2

value,n-1
=========实现如下:

package org.hive_test.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

public class StrNumSeq extends GenericUDTF {
    @Override
    public void process(Object[] args) throws HiveException {

        String flag = args[0].toString();
        long value = Long.parseLong(args[1].toString());
        for (long i = 0L; i < value; i++) {
            try {
                String[] resRow = {flag, String.valueOf(i)};
                forward(resRow);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    @Override
    public void close() throws HiveException {

    }

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        List<? extends StructField> allStructFieldRefs = argOIs.getAllStructFieldRefs();

        int size = allStructFieldRefs.size();
        if (size != 2) {
            throw new UDFArgumentLengthException("Exactly two argument is expected.");
        }
        ArrayList<String> fieldNames = new ArrayList<String>(size);
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(size);
        fieldNames.add("value");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("index");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
}

表数据准备:

hive> load data local inpath "/home/admin/data/udtftest.txt" into table udtftest;
Loading data to table dws_360100.udtftest
OK
Time taken: 1.224 seconds
hive> select * from udtftest;
OK
lisi    4
wangwu  6
menfang 8
shijian 9
Time taken: 1.656 seconds, Fetched: 4 row(s)
hive>

打包和注册函数的过程与上面相同的这里省略,执行结果如下:
在这里插入图片描述


版权声明:本文为qq_41985003原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。