Hive之自定义函数(UDF、UDAF、UDTF)

内容仅截取函数部分,其他Hive学习笔记见个人博客 https://wangbowen.cn/2020/08/27/Hive%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/

十三、函数

13.1 发现和描述函数

-- 可以看到hive当前会话的所有函数名称,包括内置和用户自定义的
show functions;
-- 查看函数的简介(加上extended显示更详细的文档)
describe function [extended] <函数名>;

13.2 调用函数

函数名(参数列表)

13.3 函数的类别

  • UDF:用户自定义函数

  • UDAF:用户自定义聚合函数

  • UDTF:用户自定义表生成函数。接收零个或多个输入,然后长生多列或多汗输出。如 array函数 和 explode 函数。不过,hive只允许表生成函数以特定的方式使用。比如:无法从表中产生其他的列。

    select name, exploed(subordinates) from employees;  -- 这个是错误的!!!!
    

    不过,hive 提供了一个 LATERAL VIEW 功能来实现这种查询:

    select name, sub
    from employees
    LATERAL VIEW exploed(subordinates) subView AS sub;
    

    通过LATERAL VIEW 可以方便地将exploed这个UDTF得到的行转列的结果集合在一起提供服务。使用LATERAL VIEW 需要指定视图别名和生成的新列的别名,本例来说分别是 subView 和 sub。

13.4 用户自定义函数

导入pom依赖

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>

13.4.1 UDF

  1. 创建类,继承UDF,添加注解,实现evaluate函数

    package cn.wangbowen.hive;
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDF;
    
    /**
     * UDFDemo class
     * 用户自定义函数
     */
    
    //@Description 这是描述。用于 describe function [extended] <函数名>;
    @Description(name = "myAdd",
                value = "myAdd(int a, int b ==> return a + b)",
                extended = "Example:\n" +
                        " > myAdd(1, 1) ==> 2" +
                        " > myAdd(1, 1, 1) ==> 3")
    // 需要继承 UDF
    public class UDFDemo extends UDF {
    
        // 实现 evaluate()函数
        // 注意,参数和返回值类型只能是hive可以序列化的数据类型。如数值,那么可以用int或Integer
        public int evaluate(int a ,int b) {
            return a + b ;
        }
    
        // 可以重载,调用的时候会自动匹配
        public int evaluate(int a ,int b , int c) {
            return a + b + c;
        }
    }
    
    
  2. 打成jar包,把jar包添加到hive的类路径下重进入hive,

    cp Hive-1.0-SNAPSHOT.jar /home/spark/app/hive-1.1.0-cdh5.15.1/lib/
    

    或者使用命令添加全路径

    ADD JAR /home/spark/app/hive-1.1.0-cdh5.15.1/lib/Hive-1.0-SNAPSHOT.jar;
    
  3. 创建临时函数

    CREATE TEMPORARY FUNCTION myAdd AS 'cn.wangbowen.hive.UDFDemo';
    
  4. 在查询中使用自定义函数

    select myAdd(1,2);
    OK
    _c0
    3
    
  5. 如果用完了,还可以删除掉这个函数

    DROP TEMPORARY FUNCTION IF EXISTS myAdd;
    

13.4.2 GenericUDF

与Hive一起使用的通用用户定义函数(GenericUDF)。新的GenericUDF类需要从该GenericUDF类继承。GenericUDF在以下方面优于普通UDF:

  1. 它可以接受复杂类型的参数,并返回复杂类型。
  2. 它可以接受可变长度的参数。
  3. 它可以接受无限数量的函数签名-例如,编写接受数组的GenericUDF很容易,数组>等(任意级别的嵌套)。
  4. 它可以使用DeferedObject进行短路评估。

在写有关通用的UDF之前最好了解以下ObjectInspector有关的内容,可以参考另一篇文章《Hive之ObjectInspector详解》

代码

  1. 编写JAVA类,然后打包

    package cn.wangbowen.hive;
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    
    /**
     * GenericUDFNvl class
     * GenericUDF 是更为复杂的抽象概念,但其支持更好的null值处理,同时可以处理一些标准UDF无法支持的编程操作
     */
    
    // 描述
    @Description(name = "nvl",
            value = "nvl(value, default_value) - 如果value非空返回value,否则返回default_value",
            extended = "Example:\n" +
                    " > SELECT nvl(null, 'bla') FROM src LIMIT 1;\n")
    // 继承 GenericUDF
    public class GenericUDFNvl extends GenericUDF {
        // returnOIResolver 是一个内置的类,其通过获取非null值的变量的类型并使用这个数据类型来确定返回值类型
        private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
    
        private ObjectInspector[] argumentOIs;
    
    
        /**
         * initialize方法会被输入的每个参数调用,并最终传入到一个ObjectInspector对象中。
         * 这个方法的目标是确定函数的返回类型。(重点)
         * 如果传入的方法的类型是不合法的,这时用户同样可以向控制台抛出一个Exception。
         */
        @Override
        public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
            // 将传入的参数复制到私有变量
            argumentOIs = objectInspectors;
            // 检查输入参数合法性:如果参数个数不是2,那么抛出异常
            if (objectInspectors.length != 2) {
                throw new UDFArgumentException("The operator 'NVL' accepts 2 arguments.");
            }
            returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
            if (!(returnOIResolver.update(objectInspectors[0]) && returnOIResolver.update(objectInspectors[1]))) {
                throw new UDFArgumentTypeException(2, "The 1st and 2nd args of function NLV should have the same type," +
                        "but they are different:\"" + objectInspectors[0].getTypeName() + "\" and \"" +
                        objectInspectors[1].getTypeName() + "\"");
            }
            return returnOIResolver.get();
        }
    
        /**
         * evaluate方法的输入是一个DeferredObject对象数组,而initialize方法中创建的returnOIResolver对象就用于从DeferredObject对象中
         * 获取到值。在这种情况下,这个函数将会返回第一个非null的值
         */
        @Override
        public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
            Object retVal = returnOIResolver.convertIfNecessary(deferredObjects[0].get(), argumentOIs[0]);
            // 如果第一个值为空,返回第二个
            if (retVal == null) {
                retVal = returnOIResolver.convertIfNecessary(deferredObjects[1].get(), argumentOIs[1]);
            }
            return retVal;
        }
    
        /**
         * getDisplayString 用于Hadoop task 内部,在使用到这个函数时来展示调式信息
         */
        @Override
        public String getDisplayString(String[] strings) {
            StringBuilder sb = new StringBuilder();
            sb.append("if ");
            sb.append(strings[0]);
            sb.append(" is null ");
            sb.append("returns");
            sb.append(strings[1]);
            return sb.toString();
        }
    }
    
  2. 添加JAR

    hive (default)> ADD JAR /home/spark/tmp/Hive-1.0-SNAPSHOT.jar;
    
  3. 创建函数

    hive (default)> CREATE TEMPORARY FUNCTION nvl AS 'cn.wangbowen.hive.GenericUDFNvl';
    
  4. 调用函数

    hive (default)> SELECT nvl(1, 2) AS COL1,
                  > nvl(NULL, 5) AS COL2,
                  > nvl(NULL, "STUFF") AS COL3;
    OK
    col1	col2	col3
    1		5		STUFF
    

13.4.3 不变函数

目前为止,我们都是将代码达成JAR包,然后用ADD JAR 和 CREATE TEMPORARY FUNCTION 来使用这些函数。

用户同样可以将自己写的函数永久加到hive中,不过需要对hive的Java文件进行简单修改,然后重新编译hive。

对于hive源码,需要对 ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java 这个类进行一行的代码修改,然后按照hive源码分支的编译方式,重新编译hive:

// 添加一行代码(这个类要在类路径下,可以放到hive/lib/下)
system.registerGenericUDF("nvl", GenericUDFNvl.class);

尽管建议是重新部署整个新的编译后的版本,不过实际上只需要替换掉 hive-exec-*.jar这个JAR文件即可,其中 * 是版本号。

但是我发现!永久函数去掉 TEMPORARY 就可以了,不用重新编译

CREATE [TEMPORARY] FUNCTION myAdd AS 'cn.wangbowen.hive.UDFDemo';

13.4.4 UDAF

我觉得这个文章写的挺好的:https://dzone.com/articles/writing-custom-hive-udf-andudaf

UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAF。

聚合过程是在map或reduce任务(task)中执行的,其是一个有内存限制的Java进程。在写UDAF的时候,一定要注意内存使用的问题。可以通过配置参数,调整执行过程的内存需求量,但是并非总是有效:

<property>
	<name>mapred.child.java.opts</name>
    <value>-Xmx200m</value>
</property>

13.4.5 GenericUDAF

官方文档:https://cwiki.apache.org/confluence/display/Hive/GenericUDAFCaseStudy

这篇博客也不错:https://blog.csdn.net/kent7306/article/details/50110067

Mode 和 ObjectInspector

在写通用UDAF之前最好知道两个东西:

  • Mode

    首先要清楚一点!hive最终是以MR的方式运行的。所以,有map、combiner、reduce这些过程。下面那些调用的方法后面会提到。

    public static enum Mode {
        /**
         * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
         * 将会调用 init() --> iterate() --> terminatePartial()
         */
        PARTIAL1,
        /**
         * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据:从部分数据聚合到部分数据聚合(预聚合)
         * 将会调用 init() --> merge() --> terminatePartial()
         */
        PARTIAL2,
        /**
         * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
         * 将会调用 init() --> merge() --> terminate()
         */
        FINAL,
        /**
         * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
          * 将会调用 init() --> iterate() --> terminate()
         */
        COMPLETE
    };
    
  • ObjectInspector

    这个理解起来比较费劲。我们都知道map-reduce是可能有不同的输入输出类型的,而hql最终会转换成多个MR job,因此这些Job之间数据的输入输出类型就根据 ObjectInspector来控制。

    建议看我的另一篇文章《Hive之ObjectInspector详解》

    博友解释

    作用主要是解耦数据使用与数据格式,使得数据流在输入输出端切换不同的输入输出格式,不同的Operator上使用不同的格式。

    官方解释

    Hive使用ObjectInspector来分析行对象的内部结构以及各个列的结构。ObjectInspector提供了一种统一的方式来访问可以以多种格式存储在内存中的复杂对象,包括:

    • Java类的实例(Thrift或本机Java)
    • 标准Java对象(我们使用java.util.List表示Struct和数组,并使用java.util.Map表示Map)
    • 延迟初始化的对象(例如,存储在单个Java字符串对象中的字符串字段的Struct,每个字段的起始偏移量)一对ObjectInspector和Java Object。

    ObjectInspector不仅告诉我们对象的结构,还提供了访问对象内部字段的方法。

UDAF 框架

总结来说,编写通用UDAF主要流程如下:

  1. 编写解析器类(Resolver)

    // 1.编写解析器类(继承AbstractGenericUDAFResolver)
    public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolver {
        // 设置LOG,以便将信息传入hive日志
        static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName());
    
        // 2.重写getEvaluator方法
        @Override
        public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
            // Type-checking goes here! 类型检查
    		// 创建评估器类并返回
            return new GenericUDAFHistogramNumericEvaluator();
        }
    
        // 3.编写一个静态内部类,即评估器类(继承GenericUDAFEvaluator)
        public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {
            // UDAF logic goes here! UDAF逻辑,细节再下面。
        }
    }
    
  2. 创建评估器类(Evaluator):我们同时来了解一下这些方法

    public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {
    
        // For PARTIAL1 and COMPLETE: ObjectInspectors for original data
        private PrimitiveObjectInspector inputOI;
        private PrimitiveObjectInspector nbinsOI;
    
        // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles)
        private StandardListObjectInspector loi;
    
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            // return type goes here 确定返回类型
        }
    
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            // return value goes here 局部返回值
        }
    
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            // final return value goes here 最终返回值
        }
    
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        }
    
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        }
    
        // Aggregation buffer definition and manipulation methods
        // 聚合缓冲区定义和操作方法,也就是说聚合过程的临时变量以及操作的方法定义的地方
        static class StdAgg implements AggregationBuffer {
        };
    
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        }
    
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
        }   
    }
    

我们还要了解一下 GenericUDAFEvaluator 的方法(按被调用的时间顺序):

方法名描述
inithive会调用此方法来初始化实例一个UDAF evaluator 类。
确定各个阶段输入输出参数的数据格式ObjectInspectors。因为不同的Mode对应不同的阶段,不同的阶段调用不同的方法,但是所有阶段的输入输出类型都在init方法中被确定!
getNewAggregationBuffer返回一个用于存储中间聚合结果的对象
iterator将一行新的数据载入到聚合buffer中
terminatePartial以一种可持久化的方法返回当前聚合的内容(即MR中map阶段结束返回的数据)。这里所说的可持久化是值返回值只可以用Java基本数据类型和array,以及基本封装类型(如,Double),Hadoop中Writable类、list和map类型。不能使用用户自定义类型(即使实现了java.io.Serializable)
merge将terminatePartial返回的中间部分聚合结果合并到当前聚合中
terminate返回最终聚合结果给hive

可以发现,这是不是很像MR的过程?在init方法中,在判断评估器所处的模式之后,可以设置返回结果类型的对象检查器。iterate方法和terminatePartial方法会在map端使用到,而terminate方法和merge方法会在reduce端使用到,用于生成最终结果。在所有情况看下,合并过程都会产生大的列表。

示例
  1. 编码,打成jar包

    package cn.wangbowen.hive;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.parse.SemanticException;
    import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    import org.apache.hadoop.hive.serde2.objectinspector.*;
    import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
        * GenericUDAFCollect class
        * 通用UDAF
        * 需求:模拟MySQL中一个非常有用的函数名为GROUP_CONCAT,其可以将一组中的所有元素按照用户指定的分隔符组装成一个字符串。
        *      比如: hadoop 1
        *            hadoop 2
        *            hive   3
        *            hive   4
        *      mysql > SELECT name, GROUP_CONCAT(number SEPARATOR ',') FROM src GROUP BY name;
        *      最终会输出:
        *            hadoop 1,2
        *            hive   3,4
        *
        * 分析:首先,我们需要一个聚合函数,将所有的输入(即列的所有数据)作为一个列表加入到集合中。我们使用ArrayList实例。
        *      这个聚合过程的结果就是产生一个包含有所有值的数组。【可以参考函数collect_set,这个是排重后的结果】
        *      然后,我们可以用concat_ws函数,以指定分隔符将上面得到的数组,拼接成一个字符串。
        *      最后,输出结果
        *
        * @author BoWenWang
        * @date 2020/8/28 22:14
        */
    // 2.UDAF描述
    @Description(name = "collect", value = "_FUNC_(x) - Returns a list of objects.")
    // 1.创建解析器,继承AbstractGenericUDAFResolver
    public class GenericUDAFCollect extends AbstractGenericUDAFResolver {
        // 3.日志
        static final Log LOG = LogFactory.getLog(GenericUDAFCollect.class);
    
        // 4.重写getEvaluator方法
        @Override
        public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
            // 4.1 输入参数个数检查
            if (parameters.length != 1) {
                throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected.");
            }
            /**
                * 这里需要了解一下一个类:
                * public interface ObjectInspector extends Cloneable {
                *     // 获取类型名字
                *     String getTypeName();
                *     // 获取类型
                *     ObjectInspector.Category getCategory();
                *
                *     public static enum Category {
                *         PRIMITIVE,   // 原始(应该是基本类型)
                *         LIST,
                *         MAP,
                *         STRUCT,
                *         UNION;
                *
                *         private Category() {
                *         }
                *     }
                * }
                */
            // 4.2 输入参数类型检查:如果输入参数不是基本数据类型,抛出异常
            if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
                throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " +
                                                   parameters[0].getTypeName() + " was passed as parameter 1.");
            }
    
            // 4.3 创建评估器,返回
            return new GenericUDAFMkListEvaluator();
        }
    
        // 5. 创建评估器,继承GenericUDAFEvaluator,并实现方法
        public static class GenericUDAFMkListEvaluator extends GenericUDAFEvaluator {
    
            // 6. 【根据需要】创建对应的对象检查器
            // 基本数据类型 对象检查器:【各个阶段的输入类型,最终阶段是List中元素的类型】
            private PrimitiveObjectInspector inputOI;
            // 列表类型 对象检查器:【最终阶段输出类型】
            private StandardListObjectInspector loi;
            // 列表类型 对象检查器:【用于最终阶段,输入类型List】
            private StandardListObjectInspector internalMergeOI;
    
            /**
                * init方法中根据不同的mode指定输出数据的格式ObjectInspector
                */
            @Override
            public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
                super.init(m, parameters);
                // 如果是PARTIAL1(map阶段,原始数据到部分聚合数据):String -> List<String>
                if (m == Mode.PARTIAL1) {
                    // 将输入类型赋值给inputOI
                    inputOI = (PrimitiveObjectInspector) parameters[0];
                    // 确定输出类型:通过工厂创建(好像return的时候都是用Factory,其他的时候用Utils)
                    // 一个List对象检查器,List中元素的类型是inputOI类型
                    return ObjectInspectorFactory.getStandardListObjectInspector(
                        // 这个工具函数的作用,应该是获取参数ObjectInspector的相应标准ObjectInspector
                        (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(inputOI)
                    );
                } else {
                    // 如果参数类型不可以转为List(说明是combiner阶段?但是为什么输入类型是基本数据类型?)
                    if (!(parameters[0] instanceof StandardListObjectInspector)) {
                        // 更新输入类型inputOI
                        inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(parameters[0]);
                        // 输出类型是List<Type(inputOI)>
                        return (StandardListObjectInspector) ObjectInspectorFactory.getStandardListObjectInspector(inputOI);
                    } else {
                        // 如果参数类型可以转为List(说明是reduce阶段,部分聚合到完全聚合)更新输入输出类型
                        internalMergeOI = (StandardListObjectInspector) parameters[0];
                        inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();
                        loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
                        return loi;
                    }
                }
            }
    
            /**
                * 聚合缓冲区定义和操作方法,也就是说聚合过程的临时变量以及操作的方法定义的地方
                */
            static class MkArrayAggregationBuffer implements AggregationBuffer {
                // 临时数据列表(为什么这里要用Object呢?是不是因为使用了ObjectInspector?数据和类型分离,数据都是Object,
                // 需要使用的时候再通过ObjectInspector对应的类型得到具体数据)
                List<Object> container;
            }
    
            /**
                * 添加元素到聚合缓冲区集合中的方法,因为要使用到外部变量this.inputOI,因此没有写到类中
                */
            public void putIntoList(Object p, MkArrayAggregationBuffer myagg) {
                Object pCopy = ObjectInspectorUtils.copyToStandardObject(p, this.inputOI);
                myagg.container.add(pCopy);
            }
    
            /**
                * 初始化聚合缓冲区方法
                */
            @Override
            public void reset(AggregationBuffer agg) throws HiveException {
                ((MkArrayAggregationBuffer) agg).container = new ArrayList<>();
            }
    
            /**
                * 返回一个用于存储中间聚合结果的对象
                */
            @Override
            public AggregationBuffer getNewAggregationBuffer() throws HiveException {
                MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer();
                // 初始化
                reset(ret);
                return ret;
            }
    
            /**
                * 迭代器,用于遍历输入的每一行数据,并做对应的处理。这里我们把每一个字符串添加到List集合中
                *
                * @param agg        聚合缓冲区
                * @param parameters 参数列表,即一行数据
                */
            @Override
            public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
                // 这里断言,我们的输入因为只有1个列,所以参数也只有一个
                assert (parameters.length == 1);
                Object p = parameters[0];
    
                // 如果数据不为空,那么就添加到聚合缓冲区的列表中
                if (p != null) {
                    MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
                    putIntoList(p, myagg);
                }
            }
    
            /**
                * 部分终止(map端结束)。返回当前聚合的内容(因为MR过程中会有多个map,因此每个map会有部分聚合,有点像combiner的意思)
                *
                * @param agg 聚合缓冲区
                * @return 值返回值只可以用Java基本数据类型和array,以及基本封装类型(如,Double),Hadoop中Writable类、list和map类型。
                * 不能使用用户自定义类型(即使实现了java.io.Serializable)
                */
            @Override
            public Object terminatePartial(AggregationBuffer agg) throws HiveException {
                MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
                // 创建一个ArrayList把聚合缓冲区中的数据拷贝出来(为什么要拷贝出来呢?)
                ArrayList<Object> ret = new ArrayList<>(myagg.container.size());
                ret.addAll(myagg.container);
                return ret;
            }
    
            /**
                * 将terminatePartial返回的中间部分聚合结果合并到当前聚合中
                *
                * @param agg 总的聚合缓冲区
                * @param partial 中间部分的聚合结果
                */
            @Override
            public void merge(AggregationBuffer agg, Object partial) throws HiveException {
                MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
                // 部分结果。因为上面terminatePartial部分返回的是List,且一开始terminatePartial就被定义成最终阶段输入类型
                // 而getList方法主要目的是判断partial对象,如果是List类型直接返回,如果不是那么变成List再返回。
                ArrayList<Object> partialResult = (ArrayList<Object>) internalMergeOI.getList(partial);
                // 遍历中间部分结果,并添加到总聚合缓冲区中
                for (Object o : partialResult) {
                    putIntoList(o, myagg);
                }
            }
    
            /**
                * 返回最终聚合结果
                *
                * @param agg
                * @return
                */
            @Override
            public Object terminate(AggregationBuffer agg) throws HiveException {
                MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
                ArrayList<Object> ret = new ArrayList<>(myagg.container.size());
                // 为什么这里又要再次重新拷贝一份数据呢?直接返回不行吗?
                ret.addAll(myagg.container);
                return ret;
            }
        }
    }
    
  2. 添加Jar包(放到 $HIVE_HOME/lib 下,不然有可能找不到类的错误

    ADD JAR /home/spark/app/hive-1.1.0-cdh5.15.1/lib/Hive-1.0-SNAPSHOT.jar;
    
  3. 创建临时函数

    CREATE TEMPORARY FUNCTION collect AS 'cn.wangbowen.hive.GenericUDAFCollect';
    
  4. 创建表

    hive (default)> CREATE TABLE collecttest(str STRING, countVal INT)
                  > ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
                  > LINES TERMINATED BY '\n';
    
  5. 创建数据

    [spark@spark01 /home/spark/tmp]$cat file.txt 
    hadoop,1
    hadoop,2
    hive,3
    hive,4
    
  6. 加载数据

    hive (default)> LOAD DATA LOCAL INPATH '/home/spark/tmp/file.txt' INTO TABLE collecttest;
    
    hive (default)> select * from collecttest;
    collecttest.str	collecttest.countval
    hadoop	1
    hadoop	2
    hive	3
    hive	4
    
  7. 调用测试

    hive (default)> SELECT collect(str) FROM collecttest;
    ["hadoop","hadoop","hive","hive"]
    
    hive (default)> SELECT str, concat_ws(',', collect(cast(countVal AS STRING))) FROM collecttest GROUP BY str;
    str	_c1
    hadoop	1,2
    hive	3,4
    

13.4.6 UDTF

可以返回多列或多行数据。

可以产生多行数据的 UDTF
  1. 编写代码,打JAR包

    package cn.wangbowen.hive;
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.*;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
    import org.apache.hadoop.io.IntWritable;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * UDTFDemo1 class
     * 需求:模拟for循环,接收2个整数类型参数,然后返回整数范围之间的N行数据。
     * 为什么选这个例子,是为了说明ObjectInspector的常量类型。
     *
     * @author BoWenWang
     * @date 2020/8/30 20:14
     */
    
    // 1.继承 GenericUDTF
    @Description(name = "forx", value = "_FUNC_(x, y) - returns rows for x to y")
    public class UDTFDemo1 extends GenericUDTF {
    
        // 2.定义需要的临时变量
        IntWritable start;
        IntWritable end;
    
        // 3.定义用于存放要返回的结果行
        Object[] forwardObj = null;
    
        /**
         * 4.初始化方法
         */
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
            List<? extends StructField> fieldRefs = argOIs.getAllStructFieldRefs();
            // 因为本函数的输入参数都是常数,所以在初始化方法种就可以确定各个变量的值了。
            // 要获取值,必须要用 WritableConstantIntObjectInspector
            start = ((WritableConstantIntObjectInspector) fieldRefs.get(0).getFieldObjectInspector()).getWritableConstantValue();
            end = ((WritableConstantIntObjectInspector) fieldRefs.get(1).getFieldObjectInspector()).getWritableConstantValue();
    
            // 因为我们每行数据就一列
            forwardObj = new Object[1];
    
            // 创建列名List
            ArrayList<String> fieldNames = new ArrayList<>();
            // 创建对应列的ObjectInspector List
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
    
            // 添加列信息
            fieldNames.add("col0");
            fieldOIs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                    PrimitiveObjectInspector.PrimitiveCategory.INT));
    
            // 设置返回类型
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        /**
         * 5.实际进行处理的过程
         */
        @Override
        public void process(Object[] args) throws HiveException {
            for (int i = start.get(); i <= end.get(); i++) {
                // 将每一行的数据放入forwardObj中
                forwardObj[0] = i;
                // 通过forward方法多次调用,每次调用就可以获取一行数据
                forward(forwardObj);
            }
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    }
    
  2. 一样的调用过程

    ADD JAR /home/spark/app/hive-1.1.0-cdh5.15.1/lib/Hive-1.0-SNAPSHOT.jar;
    
    CREATE TEMPORARY FUNCTION forx AS 'cn.wangbowen.hive.UDTFDemo1';
    
    hive (default)> SELECT forx(2, 5);
    OK
    col0
    2
    3
    4
    5
    
可以产生具有多个字段(多列)的 UDTF
  1. 编写代码,打JAR包

    package cn.wangbowen.hive;
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.*;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    
    import java.util.ArrayList;
    
    
    /**
     * UDTFDemo2 class
     * 需求:比如我们现在有一个自定义对象Book:
     * public class Book {
     *     public String isbn;
     *     public String title;
     *     public String[] authors;
     *
     *     public void formString(String parts) {
     *         // 反序列化(即,字符串进行解析转为Java对象)
     *     }
     *
     *     public String toString() {
     *         // 序列化(即,将属性以指定分隔符进行拼接成字符串)。如:
     *         return "5555555|Programming Hive|Edward,Dean,Jason";
     *     }
     * }
     *
     * 但是,我们的Hive处理不了这种对象,我们可以先把这个对象通过序列化成字符串,然后导入hive中,就像这样:
     * hive (default)> create table books(book_info STRING);
     * hive (default)> load data local inpath '/home/spark/tmp/books.txt' into table books;
     * hive (default)> select * from books;
     * books.book_info
     * 5555555|Programming Hive|Edward,Dean,Jason
     * 6666666|Programming Hadoop|a1,a2,a3
     *
     * 而我们想要的结果是:三列数据(isbn 整数, title 字符串, authors 字符串数组)。如:
     * isbn	title	authors
     * 5555555	Programming Hive	["Edward","Dean","Jason"]
     * 6666666	Programming Hadoop	["a1","a2","a3"]
     *
     * @author BoWenWang
     * @date 2020/8/30 21:22
     */
    @Description(name = "parse_book")
    public class UDTFDemo2 extends GenericUDTF {
    
        // 输入参数的类型
        private PrimitiveObjectInspector stringOI;
        // 返回一行的数据对象
        Object[] forwardObj = null;
    
        /**
         * 我觉得在初始化方法中,对于输入参数要么就是检查类型,要么就是获取常量值。
         * 还有就是获取输入参数的类型,为了后面处理方法的时候进行转换
         */
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    
            // 赋值输入类型
            stringOI = (PrimitiveObjectInspector) argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector();
    
            // 列名字
            ArrayList<String> fieldNames = new ArrayList<>();
            // 列类型
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
            // 添加列信息
            // 1.ISNB 整数
            fieldNames.add("isbn");
            fieldOIs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                    PrimitiveObjectInspector.PrimitiveCategory.INT
            ));
            // 2.title 字符串
            fieldNames.add("title");
            fieldOIs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                    PrimitiveObjectInspector.PrimitiveCategory.STRING
            ));
            // 3.authors 字符串数组
            fieldNames.add("authors");
            fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(
                    PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                            PrimitiveObjectInspector.PrimitiveCategory.STRING
                    )
            ));
    
            // 申请3个返回数据空间
            forwardObj = new Object[3];
            // 设置返回类型
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        @Override
        public void process(Object[] args) throws HiveException {
            // 这里我们只有一个参数,所以是args[0]。根据对象检查器会有一个方法来获取实例。
            // 可以根据PrimitiveObjectInspector,调用获取对象方法,直接获取。
            String str = stringOI.getPrimitiveJavaObject(args[0]).toString();
            // 进行解析
            String[] fields = str.split("\\|");
            forwardObj[0] = Integer.valueOf(fields[0]);
            forwardObj[1] = fields[1];
            forwardObj[2] = fields[2].split(",");
            // 返回该行数据
            forward(forwardObj);
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    }
    
  2. 一样的调用过程

    ADD JAR /home/spark/app/hive-1.1.0-cdh5.15.1/lib/Hive-1.0-SNAPSHOT.jar;
    
    CREATE TEMPORARY FUNCTION book AS 'cn.wangbowen.hive.UDTFDemo2';
    
    hive (default)> select book(book_info) AS (isbn, title, authors) from books;
    OK
    isbn	title	authors
    5555555	Programming Hive	["Edward","Dean","Jason"]
    6666666	Programming Hadoop	["a1","a2","a3"]
    

版权声明:本文为weixin_42167895原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。