集群环境如何读取JSON文件

1 需求

在TDH集群环境获取jar包目录下dict.json文件,调用udf函数getDictName( ), 传入两个入参,分别是一级编码和二级编码(srcCode及srcSbuCode),最终返回值为"字典名称"(dictName)。
比如函数入参传入‘1001’和‘0’,返回值为“董事长”,入参传入‘1001’和‘3’,返回值为“财务负责人”。

2 思路分析

2.1 现有方案

对于解析字典的需求,现有做法是在大数据集群hdfs指定路径上传xml文件,通过调用udf函数,在udf函数逻辑中解析xml文件,获取字典名称。

该做法可以正常实现要求,但是也存在两个劣势:1) 需要在hdfs指定目录上传xml文件,否则无法读取到文件;2) 函数入参中除传入两个入参外,同时要求传入xml文件路径。如果路径错误或者没有传入,就存在执行失败或者无法解析出正确字典名称的问题。

2.2 优化方案

针对现有的解决方案,如何避免以上两个问题?是否存在优化方案,可以做到不上传xml文件到hdfs目录,同时函数只传入两个入参(srcCode及srcSbuCode),在满足以上前提下,可以正常获取返回值dictName。

需求已经明确了,接下来就是考虑如何实现需求。通过分析发现,可以通过以下步骤实现:
1)编写json文件,文件名称为“dict.json”(该文件需要满足json文件格式),上传至java工程项目中resources目录下;
2)在udf函数getDictName( )代码逻辑中编写解析json文件逻辑;
3)项目打包后上传jar包至集群指定的自定义函数目录(非hdfs路径);
4)在spark on yarn 模式下,通过sql调用udf函数getDictName( srcCode,srcSbuCode),传入两个入参后保证在集群环境下可以获取返回值dictName。

3 实现方案

3.1 编写Json文件

附:json文件格式

{
"1001":{
      "0":"董事长",
      "1":"控制人",
      "2":"主要负责人",
      "3":"财务负责人",
      "4":"监事长"
    },
"1002":{
      "0":"证件地址",
      "1":"联系地址",
      "2":"家庭地址",
      "3":"单位地址",
      "4":"户籍地址",
      "5":"出生地址"
    },
    ......
}

3.2 编写udf函数

3.2.1 编写函数工具类

package com.ccsg.udf.utils;

/**
 * @author allan
 * @version 1.0
 * @date 2022/3/5 21:43
 * @desc 读取文件工具类
 */

import java.io.*;

public class FileUtil {

    /**
     * 读取文件文本内容
     *
     * @param inputStream
     * @return 返回字符串
     */
    public static String readFile(InputStream inputStream) throws IOException {
        Reader reader = new InputStreamReader(inputStream, "utf-8");
        int ch = 0;
        StringBuffer sb = new StringBuffer();
        while ((ch = reader.read()) != -1) {
            sb.append((char) ch);
        }
        reader.close();
        return sb.toString();

    }

}

3.2.2 编写函数核心逻辑

package com.ccsg.udf.function.udf;

import com.alibaba.fastjson.JSON;
import com.ccsg.udf.utils.FileUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.api.java.UDF2;

import java.io.InputStream;
import java.util.Map;

/**
 * @author allan
 * @version 1.0
 * @date 2022/3/5 21:43
 * @desc 字典转换
 * @parm srcCode 及 srcSubCode
 * @return dictName
 */
public class GetDictName implements UDF2<String, String, String> {
    private static final String DEFAULT_DICT_NAME = " ";

    public String call(String srcCode, String srcSubCode) throws Exception {
        // 获取文件所在包路径,调用工具类FileUtil中方法readFile()读取json文件
        InputStream inputStream = UDFGetDictName.class.getClassLoader().getResourceAsStream("dict.json");
        String jsonStr = FileUtil.readFile(inputStream);

        // 定义返回结果
        String dictName = null;

        // 数据结构为嵌套Map结构: Map<srcCode,Map<srcSubCode,dictName>>
        if (StringUtils.isNotBlank(jsonStr) && StringUtils.isNotBlank(srcCode) && StringUtils.isNotBlank(srcSubCode)) {
            // 解析json字符串,获取最终返回结果dictName
            Map<String, Object> jsonMap = (Map) JSON.parse(jsonStr);
            Map<String, String> destMap = (Map<String, String>) jsonMap.get(StringUtils.trim(srcCode));

            if (null != jsonMap && null != destMap
                    && jsonMap.containsKey(StringUtils.trim(srcCode)) 
                    && destMap.containsKey(StringUtils.trim(srcSubCode))) {
                dictName = destMap.get(StringUtils.trim(srcSubCode));
                return dictName;
            } else {
                return DEFAULT_DICT_NAME;
            }
        }

        return DEFAULT_DICT_NAME;
    }


}

说明:
1)建议类中获取文件所在包路径方式:类名.class.getClassLoader().getResourceAsStream(“dict.json”);该方式支持本地和集群环境下正常找到json文件所在目录;
2)调用Json转map方式进行逻辑处理: Map<String, Object> jsonMap = (Map) JSON.parse(jsonStr)
3)已验证以下两种方式支持在本地环境找到文件所在目录,不支持集群环境,所以不作为优先考虑方式
方式1:String fileRelativePath = Thread.currentThread().getContextClassLoader().getResource(“dict.json”).getFile();
方式2:UDFGetDictName.class.getResource("/") + “dict.json”;
4)充分考虑数据场景,考虑代码健壮性,对可能传入错误参数 或者传入空值 进行优化处理,避免运行时报空指针异常;
5)以上udf函数编写使用spark中自定义函数框架,通过实现UFD2接口,重写call()方式处理,两个入参及返回值都为String类型。

3.2.3 本地环境main()方法自测

    public static void main(String[] args) throws Exception {
        GetDictName gdn = new GetDictName();
        // 1 正确场景:" 1001", "0"
        String result1 = gdn.call("1001", "0");
        
        // 2 入参为空场景
        String result2 = gdn.call(" ", " ");
        String result3 = gdn.call("", " ");
        String result4 = gdn.call(null, " ");
        String result5 = gdn.call("1001", " ");
        
        // 3 错误数据 "1100", "18"  / "1100", "1" / "1001", "7"
        String result6 = gdn.call("1100", "18");
        String result7 = gdn.call("1100", "1");
        String result8 = gdn.call("1001", "7");

        System.out.println(result1);
        System.out.println(result2);
        System.out.println(result3);
        System.out.println(result4);
        System.out.println(result5);
        System.out.println(result6);
        System.out.println(result7);
        System.out.println(result8);

    }

返回结果:

只有result1存在返回值“董事长”,其他都返回默认值(空格)

3.3 打包注意事项

3.3.1 目录结构图

在这里插入图片描述
说明:在resources目录下上传指定Json文件,如idea中该目录不存在则新建即可。

3.3.2 pom.xml新增内容

            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.xml</include>
                    <include>**/*.properties</include>
                    <include>**/*.json</include>
                </includes>
                <filtering>true</filtering>
            </resource>

3.3.3 执行maven install效果

在这里插入图片描述
说明:可以看到打包后json文件在根目录下

3.4 TDH集群环境验证

1)上传jar包到集群指定自定义函数目录

2)sql中调用函数

select t.id_kind, GetDictName('1001',t.id_kind) as dict_name from person t where t.id_kind in ('0','1','2');

返回值:

id_kind 	dict_name 
0       	董事长
1			控制人
2			主要负责人	

说明:项目中使用类名作为函数名称

4 回顾总结

1)充分考虑需求,结合Json文件格式和业务逻辑,合理设计实优化方案;
2)考虑使用udf函数,保证集群环境下找到Json文件目录并读取内容,重点验证获取路径方式,即优先考虑方式:类名.class.getClassLoader().getResourceAsStream(“dict.json”);
3)实现业务逻辑时充分考虑各种数据场景,增强代码健壮性;
4)本需求涉及IO流、反射、udf函数专业知识,需要对java及大数据基础有深入了解。


版权声明:本文为weixin_44868723原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。