Hive中lateral view的应用到源码解读

对于从事大数据开发的同学,经常会应用到explode(炸裂函数)和lateral view(侧输出流)。

  Explode(炸裂函数)

  1. 参数必须是array或者map格式(通常跟split函数使用);
  2. 主要是将数组中每个元素单独取出来,可以单独使用。
  3. 注意:单独使用时,仅选择目标字段展示,不可与主表中其他字段一起展示。

  Lateral view(侧输出流)

  1. Lateral view(侧输出流)通常与UDTF函数一起使用。列转行的时候,通常用lateral view explode(字段)
  2. 语法上在表后,where之前;
  3. 理解上可以认为lateral view(虚表)与主表是inner join的逻辑(注意:理解上可以这么认为,但是底层逻辑并不是这样下面会做说明)
  4. Lateral view explode(字段) 虚拟表 as 列可以与主表的多个字段一起展示

Lateral view 与 主表逻辑上为什么可以认为是inner join?

 首先看一段HSQL:

select t3.value as categoryId
  from (  select features,split(t2.categoryIdStr,',') as values
            from AA
         lateral view json_tuple(features, 'categoryId') t1 AS categoryIdArray
         lateral view explode(split(regexp_replace(regexp_extract(t1.categoryIdArray,'^\\[(.+)\\]$',1),'\\}\\,\\{', '\\}\\|\\|\\{'),'\\|\\|')) t2 AS categoryIdStr
           where features like '%categoryId%'
       ) tmp
 lateral view explode(values) t3 AS value

当t1表中数据为null的时候,查询没有数据;所以理解上可以认为tmp表与t3表进行了inner join,导致最终这个查询没有数据。

但是实际上并不是inner join逻辑的原因可以分为两个

第一:t1虚拟表可以放在 AA表下面;放入tmp内,即使tmp内没有t1表的字段,在tmp外面仍然可以访问t1的数据。这说明AA表和t1表并不是inner join.

第二:通过Hive执行流程判断

显然在这个执行树中可以看出,整个执行过程没有reduce的出现,即整个执行过程没有shuffle的出现。我们知道,Hive中每次join都会产生一个mr将数据落地。所以,整个执行过程中没有reduce的出现,则说明lateral view(侧输出流)没有进行join操作。

HIVE中执行流程:
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: aa
            Filter Operator
              predicate: (features like '%categoryId%') (type: boolean)
              Lateral View Forward
                Select Operator
                  Lateral View Join Operator
                    Lateral View Forward
                      Select Operator
                        Lateral View Join Operator
                          Select Operator
                            expressions: split(_col5, ',') (type: array<string>)
                            Lateral View Forward
                              Select Operator
                                Lateral View Join Operator
                                  Select Operator
                                    File Output Operator
                              Select Operator
                                expressions: _col1 (type: array<string>)
                                UDTF Operator
                                  function name: explode
                                  Lateral View Join Operator
                                    outputColumnNames: _col2
                                    Select Operator
                                      File Output Operator
                      Select Operator
                        expressions: split(regexp_replace(regexp_extract(_col4, '^\[(.+)\]$', 1), '\}\,\{', '\}\|\|\{'), '\|\|') (type: array<string>)
                        UDTF Operator
                          function name: explode
                          Lateral View Join Operator
                            Select Operator
                              expressions: split(_col5, ',') (type: array<string>)
                              Lateral View Forward
                                Select Operator
                                  Lateral View Join Operator
                                    Select Operator
                                      File Output Operator
                                Select Operator
                                  expressions: _col1 (type: array<string>)
                                  UDTF Operator
                                    function name: explode
                                    Lateral View Join Operator
                                      Select Operator
                                        File Output Operator
                Select Operator
                  expressions: features (type: string), 'categoryId' (type: string), _col1
                  UDTF Operator
                    function name: json_tuple
                    Lateral View Join Operator
                      Lateral View Forward
                        Select Operator
                          Lateral View Join Operator
                            Select Operator
                              expressions: split(_col5, ',') (type: array<string>)
                              Lateral View Forward
                                Select Operator
                                  Lateral View Join Operator
                                    Select Operator
                                      File Output Operator
                                Select Operator
                                  expressions: _col1 (type: array<string>)
                                  UDTF Operator
                                    function name: explode
                                    Lateral View Join Operator
                                      Select Operator
                                        File Output Operator
                        Select Operator
                          expressions: split(regexp_replace(regexp_extract(_col4, '^\[(.+)\]$', 1), '\}\,\{', '\}\|\|\{'), '\|\|') (type: array<string>)
                          UDTF Operator
                            function name: explode
                            Lateral View Join Operator
                              Select Operator
                                expressions: split(_col5, ',') (type: array<string>)
                                Lateral View Forward
                                  Select Operator
                                    Lateral View Join Operator
                                      Select Operator
                                        File Output Operator
                                  Select Operator
                                    expressions: _col1 (type: array<string>)
                                    UDTF Operator
                                      function name: explode
                                      Lateral View Join Operator
                                        Select Operator
                                          File Output Operator

即:

整个执行流程大致是:上图的过程。整个过程都在map端完成,没有reduce的参与。(吐槽:Hive的Operator Tree看着很恶心)

源码解读:

对于lateral view的应用规则熟悉以后,都知道lateral view 与UDTF()函数连用。对应的Hive执行计划中UDTF Operator操作过程。

在自定义UDTF函数时,通常需要继承GenericUDTF函数。Hive源码中提供了以下六种UDTF函数继承GenericUDTF类和八个文件(其中需要关注UDTFCollector类),其中GenericUDTFExplode即hive中explode(炸裂函数)。

 

GenericUDTF源码中主要有一个方法:
// 该方法主要是将结果放入收集器中
  protected final void forward(Object o) throws HiveException {
    collector.collect(o);
  }
通过该方法查找上查Collector接口,其中实现这个接口的类仅有

 

UDTFCollector类中主要有有一个方法:
//  该方法主要将GenericUDTF收集的数据传递给UDTFOperator
  public void collect(Object input) throws HiveException {
    op.forwardUDTFOutput(input);
    counter++;
  }

UDTFOperator类中主要有三个方法
// UDTF函数操作的初始化
  protected void initializeOp(Configuration hconf) throws HiveException {
    genericUDTF = conf.getGenericUDTF();
    collector = new UDTFCollector(this);

    genericUDTF.setCollector(collector);

    udtfInputOI = (StructObjectInspector) inputObjInspectors[0];

    objToSendToUDTF = new Object[udtfInputOI.getAllStructFieldRefs().size()];

    MapredContext context = MapredContext.get();
    if (context != null) {
      context.setup(genericUDTF);
    }
    StructObjectInspector udtfOutputOI = genericUDTF.initialize(udtfInputOI);

    if (conf.isOuterLV()) {
      outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
    }

    if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) {
      autoProgressor = new AutoProgressor(this.getClass().getName(), reporter,
          Utilities.getDefaultNotificationInterval(hconf),
          HiveConf.getTimeVar(
              hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
      autoProgressor.go();
    }

    super.initializeOp(hconf);
  }

//  UDTF函数操作的(操作)过程
  public void processOp(Object row, int tag) throws HiveException {
    // The UDTF expects arguments in an object[]
    StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
    List<? extends StructField> fields = soi.getAllStructFieldRefs();

    for (int i = 0; i < fields.size(); i++) {
      objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i));
    }

    genericUDTF.process(objToSendToUDTF);
    if (conf.isOuterLV() && collector.getCounter() == 0) {
      collector.collect(outerObj);
    }
    collector.reset();
  }

// 输出
  public void forwardUDTFOutput(Object o) throws HiveException {
    // Since the output of the UDTF is a struct, we can just forward that
    forward(o, outputObjInspector);
  }
其中核心关注
initializeOp方法:
    if (conf.isOuterLV()) {
      outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
    }
该方法是指:UDTFOperator时,如果isOuterLV()为true,则获取到的Array数据大小一样的数组outerObj
processOp方法:
    genericUDTF.process(objToSendToUDTF);
    if (conf.isOuterLV() && collector.getCounter() == 0) {
      collector.collect(outerObj);
}
该方法是指UDTFOperator时,如果isOuterLV()为true时,将outerObj结果放入UDTFOperator的收集器中

这两个方法中都有一个isOuterLV()方法,查看该方法的源码:

@Explain(displayName = "UDTF Operator")
public class UDTFDesc extends AbstractOperatorDesc {
  private static final long serialVersionUID = 1L;

  private GenericUDTF genericUDTF;
  private boolean outerLV;

  public UDTFDesc() {
  }

  public UDTFDesc(final GenericUDTF genericUDTF, boolean outerLV) {
    this.genericUDTF = genericUDTF;
    this.outerLV = outerLV;
  }

  public GenericUDTF getGenericUDTF() {
    return genericUDTF;
  }

  public void setGenericUDTF(final GenericUDTF genericUDTF) {
    this.genericUDTF = genericUDTF;
  }

  @Explain(displayName = "function name")
  public String getUDTFName() {
    return genericUDTF.toString();
  }

  public boolean isOuterLV() {
    return outerLV;
  }

  public void setOuterLV(boolean outerLV) {
    this.outerLV = outerLV;
  }

  @Explain(displayName = "outer lateral view")
  public String isOuterLateralView() {
    return outerLV ? "true" : null;
  }
}

其中isOuterLateralView()这个方法说明HIVE中存在outer lateral view语法。当Hql语法中有这个写法即为true;

源码分析到这个地方,整体回溯上去就能发现当时用lateral view outer语法时,表明输入的结果集与输出的结果集个数一样。则使用lateral view outer UDTF()函数时,理解上可以认为是主表与虚拟表之间进行left join。注意:其本质上本没有进行join,只是就结果而言可以这么理解。

GenericUDTFExplode源码中主要有二个方法:
//初始化
  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
    if (args.length != 1) {
      throw new UDFArgumentException("explode() takes only one argument");
    }

    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

    switch (args[0].getCategory()) {
    case LIST:
      inputOI = args[0];
      fieldNames.add("col");
      fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
      break;
    case MAP:
      inputOI = args[0];
      fieldNames.add("key");
      fieldNames.add("value");
      fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
      fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
      break;
    default:
      throw new UDFArgumentException("explode() takes an array or a map as a parameter");
    }

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
        fieldOIs);
  }
// 执行过程
  public void process(Object[] o) throws HiveException {
    switch (inputOI.getCategory()) {
    case LIST:
      ListObjectInspector listOI = (ListObjectInspector)inputOI;
      List<?> list = listOI.getList(o[0]);
      if (list == null) {
        return;
      }
      for (Object r : list) {
        forwardListObj[0] = r;
        forward(forwardListObj);
      }
      break;
    case MAP:
      MapObjectInspector mapOI = (MapObjectInspector)inputOI;
      Map<?,?> map = mapOI.getMap(o[0]);
      if (map == null) {
        return;
      }
      for (Entry<?,?> r : map.entrySet()) {
        forwardMapObj[0] = r.getKey();
        forwardMapObj[1] = r.getValue();
        forward(forwardMapObj);
      }
      break;
    default:
      throw new TaskExecutionException("explode() can only operate on an array or a map");
    }
  }

其中if (map == null) {

        return;

      }

则返回null;

表明mr过程中,在map端没有匹配值的时候,返回null;方便理解的话便是虚表与主表进行inner join的结果。


版权声明:本文为morsunlight原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。