| 序号 | 作者 | 版本 | 时间 | 备注 |
| 1 | HamaWhite | 1.0.0 | 2022-12-05 | 新增Flink UDF |
| 2 | HamaWhite | 1.0.1 | 2022-12-06 | 新增Hive UDF |
一、基础信息
1.1 组件版本
- JDK: 1.8
- Flink: 1.16.0
- Hive: 3.1.2
- Hadoop: 3.2.2
二、准备工作
2.1 新建Scalar Function
package com.hw.flink.udf;
import org.apache.flink.table.functions.ScalarFunction;
public class MySuffixFunction extends ScalarFunction {
public String eval(String input) {
return input.concat("-HamaWhite");
}
}
Maven依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
</dependencies>2.2 构建Jar包
把上述代码打包为 flink1.16-udf-demo.jar。
2.3 上传HDFS
$ hadoop fs -put flink1.16-udf-demo.jar .
$ hadoop fs -ls /user/deploy
-rw-r--r-- 1 deploy supergroup 1228 2022-12-05 20:03 /user/deploy/flink1.16-udf-demo.jar三、验证函数
参考《flink-docs-release-1.16#create-function》
3.1 新建及测试函数
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// use the hive catalog
String catalogName = "hive";
HiveCatalog hiveCatalog = new HiveCatalog(catalogName, "flink_demo", "conf/hive");
tableEnv.registerCatalog(catalogName, hiveCatalog);
tableEnv.useCatalog(catalogName);
tableEnv.executeSql("DROP FUNCTION IF EXISTS my_suffix_udf_jar");
// create function my_suffix_udf_jar
tableEnv.executeSql("CREATE FUNCTION IF NOT EXISTS my_suffix_udf_jar " +
"AS 'com.hw.flink.udf.MySuffixFunction' " +
"USING JAR 'hdfs://xxx.xxx.xxx.xxx:9000/user/deploy/flink1.16-udf-demo.jar'"
);
// use function my_suffix_udf_jar
tableEnv.executeSql("select my_suffix_udf_jar('hw')").print();最后一行的运行结果是:
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | hw-HamaWhite |
+----+--------------------------------+
1 row in set3.2 查看Hive Meta数据库
上文用的是Hive Catalog,Flink创建函数的时候会把函数信息注册到Hive Meta数据库中,存储在FUNCS和FUNC_RU表中。查看数据如下:
mysql> select * from FUNCS where FUNC_NAME like '%jar%' \G;
*************************** 1. row ***************************
FUNC_ID: 764
CLASS_NAME: com.hw.flink.udf.MySuffixFunction
CREATE_TIME: 1670242907
DB_ID: 11
FUNC_NAME: my_suffix_udf_jar
FUNC_TYPE: 1
OWNER_NAME: NULL
OWNER_TYPE: GROUP
1 row in set (0.00 sec)mysql> select * from FUNC_RU \G;
*************************** 1. row ***************************
FUNC_ID: 764
RESOURCE_TYPE: 1
RESOURCE_URI: hdfs://xxx.xxx.xxx.xxx:9000/user/deploy/flink1.16-udf-demo.jar
INTEGER_IDX: 0四、Flink中使用Hive UDF
参考《flink-docs-release-1.16#hive_functions》
4.1 自定义Hive UDF
package com.hw.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public class MyPrefixFunction extends UDF {
public String evaluate(String input) {
return "HamaWhite-".concat(input);
}
}Maven依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<scope>provided</scope>
</dependency>
</dependencies>4.2 构建Jar包
把上述代码打包为 hive3-udf-demo.jar。
4.3 上传HDFS
$ hadoop fs -put hive3-udf-demo.jar .
4.4 在Hive SQL 中创建UDF
use flink_demo;
CREATE FUNCTION my_prefix_udf_jar AS 'com.hw.hive.udf.MyPrefixFunction'
USING JAR 'hdfs://xxx.xxx.xxx.xxx:9000/user/deploy/hive3-udf-demo.jar';4.5 Flink中测试Hive UDF
在3.1的测试代码中增加测试代码:
注: 经验证,官网中提到的 tableEnv.loadModule("hive", new HiveModule("3.1.2")) 不用写,Flink也能正常访问到Hive UDF。
# use hive udf
tableEnv.executeSql("select my_prefix_udf_jar('hw')").print();最后一行的运行结果是:
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | HamaWhite-hw |
+----+--------------------------------+
1 row in set经测试,在Flink中是可以同时用Hive和Flink的UDF的。例如:
# use flink udf
tableEnv.executeSql("select my_suffix_udf_jar('hw')").print();
# use hive udf
tableEnv.executeSql("select my_prefix_udf_jar('hw')").print();
# mixed use flink and hive udf
tableEnv.executeSql("select my_suffix_udf_jar(my_prefix_udf_jar('hw'))").print();五、参考文献
版权声明:本文为xin_jmail原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。