一、软件环境
| 系统 & 软件 | 版本 |
|---|---|
| CentOS | 7 |
| CDH | 5.16.2 |
| Flink | 1.13.6 |
| Hive | 1.1.0 |
| Hadoop | 2.6.0 |
如果未集成或集成错误,有可能报如下错误:
[bigdata_admin@dn5 flink-1.13.6-standalone]$ bin/sql-client.sh -i conf/init.sql
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
No default environment specified.
Searching for '/data/flink-1.13.6-standalone/conf/sql-client-defaults.yaml'...not found.
Failed to initialize from sql script: file:/data/flink-1.13.6-standalone/conf/init.sql. Please refer to the LOG for detailed error messages.
Shutting down the session...
done.
二、Flink扩展添加依赖jar包
①、 Flink 与 Hadoop兼容包
- flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
②、 Flink 与 Hive集成包
- flink-sql-connector-hive-1.2.2_2.11-1.13.6.jar
③、 Flink 与 Hive集成包
- commons-configuration-1.6.jar
- commons-logging-1.1.3.jar
- hadoop-auth-2.6.0-cdh5.16.2.jar
- hadoop-common-2.6.0-cdh5.16.2.jar
- hadoop-mapreduce-client-common-2.6.0-cdh5.16.2.jar
- hadoop-mapreduce-client-core-2.6.0-cdh5.16.2.jar
- hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.2.jar
- htrace-core4-4.0.1-incubating.jar
如果使用CDH,可以使用如下语句快速在${FLINK_HOME}/lib下添加jar包:
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-configuration-1.6.jar commons-configuration-1.6.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-logging-1.1.3.jar commons-logging-1.1.3.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/hadoop-auth-2.6.0-cdh5.16.2.jar hadoop-auth-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/hadoop-common-2.6.0-cdh5.16.2.jar hadoop-common-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-common-2.6.0-cdh5.16.2.jar hadoop-mapreduce-client-common-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core-2.6.0-cdh5.16.2.jar hadoop-mapreduce-client-core-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.2.jar hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/htrace-core4-4.0.1-incubating.jar htrace-core4-4.0.1-incubating.jar
三、修改Flink配置文件
在${FLINK_HOME}/conf下,修改flink-conf.yaml,新增如下内容:
classloader.check-leaked-classloader: false
四、准备FlinkSQL Clint初始化脚本
主要是创新新的CATALOG,并调整为hive方言
CREATE CATALOG TestCatalog
WITH (
'type' = 'hive',
'default-database'='default',
'hive-conf-dir'='/etc/hive/conf'
'hive-version'='1.1.0'
);
USE CATALOG TestCatalog;
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'table';
SET 'table.sql-dialect'='hive';
五、启动Hive的metastore服务
由于Flink SQL是通过metastore服务, 来访问Hive的MySQL数据库管理元数据,所以首先需要启动Hive的metastore服务:
- 对于CDH集群,如果安装完hive后,这个服务自然就启动起来了,无需重启;
- 对于社区版Hive,执行如下命令启动
nohup bin/hive --service metastore &
六、启动Flink集群
在集群的主节点上,通过执行如下脚本启动Flink集群服务:
bin/start-cluster.sh
七、根据初始化hive方言脚本启动FlinkSQL Client
bin/sql-client.sh -i conf/sql-init.sql
说明:
1: -i参数可以指定初始化脚本
2:sql-client-defaults.yml配置⽂件在 Flink1.13 及之后被移除,故此前根据这个配置文件来配置CATALOG的方式就行不通了,此时直接在自定义初始化文件(conf/sql-init.sql)中添加上述方言配置即可。
八、FlinkSQL 与 Hive集成测试
8.1 查询catalog
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| TestCatalog |
| default_catalog |
+-----------------+
8.2 使用catalog
Flink SQL> use catalog TestCatalog;
[INFO] Execute statement succeed.
8.3 查询库
Flink SQL> show databases;
+--------------------+
| database name |
+--------------------+
| ads |
| dwd |
| dws |
| ods |
| test |
+--------------------+
8.4 查询表集合
Flink SQL> show tables;
+----------------------------------------------+
| table name |
+----------------------------------------------+
| table_test |
+----------------------------------------------+
8.5 查询表内容
说明:表的完全限定符分别为 catalog名 + Hive库名 + Hive表名。
#查询表明细
Flink SQL> select * from TestCatalog.test.table_test;
# 执行聚合查询
Flink SQL> select count(*) from TestCatalog.test.table_test;
九、FlinkSQL 中的方言
Flink支持default 与 Hive 方言(SQL Dialects),默认是default。
如果要使用Hive的语法规范,需要先切换到hive方言:
# 1、显示声明使用Hive方言
Flink SQL>set table.sql-dialect=hive;
[INFO] Session property has been set.
Flink SQL>CREATE TABLE hive_dialect_test(`id` BIGINT,`name` STRING,`login_time` STRING, `location` STRING);
[INFO] Execute statement succeed.
# 2、插入测试数据
Flink SQL> insert into hive_dialect_test select 1111111111,'Andy','2022-05-01 08:37:23','NewYork';
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1b8b8ef262ac7f9d755b6f8f86f1afe4
# 3、查询表数据
在Hive客户端(如Hive shell 或 HUE下),查询FlinkSQL中创建的表及插入的数据。
#进入test数据库
hive>use test;
OK
Time taken: 0.997 seconds
hive> select * from hive_dialect_test;
OK
1111111111 Andy 2022-05-01 08:37:23 NewYork
Time taken: 0.246 seconds, Fetched: 1 row(s)