Flink1.13集成Hive1.1.0

一、软件环境

系统 & 软件版本
CentOS7
CDH5.16.2
Flink1.13.6
Hive1.1.0
Hadoop2.6.0

如果未集成或集成错误,有可能报如下错误:

[bigdata_admin@dn5 flink-1.13.6-standalone]$ bin/sql-client.sh -i conf/init.sql 
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
No default environment specified.
Searching for '/data/flink-1.13.6-standalone/conf/sql-client-defaults.yaml'...not found.
Failed to initialize from sql script: file:/data/flink-1.13.6-standalone/conf/init.sql. Please refer to the LOG for detailed error messages.

Shutting down the session...
done.

二、Flink扩展添加依赖jar包

①、 Flink 与 Hadoop兼容包

  • flink-shaded-hadoop-2-uber-2.6.5-10.0.jar

②、 Flink 与 Hive集成包

  • flink-sql-connector-hive-1.2.2_2.11-1.13.6.jar

③、 Flink 与 Hive集成包

  • commons-configuration-1.6.jar
  • commons-logging-1.1.3.jar
  • hadoop-auth-2.6.0-cdh5.16.2.jar
  • hadoop-common-2.6.0-cdh5.16.2.jar
  • hadoop-mapreduce-client-common-2.6.0-cdh5.16.2.jar
  • hadoop-mapreduce-client-core-2.6.0-cdh5.16.2.jar
  • hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.2.jar
  • htrace-core4-4.0.1-incubating.jar

如果使用CDH,可以使用如下语句快速在${FLINK_HOME}/lib下添加jar包:

ln -s /opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-configuration-1.6.jar commons-configuration-1.6.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-logging-1.1.3.jar commons-logging-1.1.3.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/hadoop-auth-2.6.0-cdh5.16.2.jar hadoop-auth-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/hadoop-common-2.6.0-cdh5.16.2.jar hadoop-common-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-common-2.6.0-cdh5.16.2.jar hadoop-mapreduce-client-common-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core-2.6.0-cdh5.16.2.jar hadoop-mapreduce-client-core-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.2.jar hadoop-mapreduce-client-jobclient-2.6.0-cdh5.16.2.jar
ln -s /opt/cloudera/parcels/CDH/lib/htrace-core4-4.0.1-incubating.jar htrace-core4-4.0.1-incubating.jar

三、修改Flink配置文件

在${FLINK_HOME}/conf下,修改flink-conf.yaml,新增如下内容:

classloader.check-leaked-classloader: false

四、准备FlinkSQL Clint初始化脚本

主要是创新新的CATALOG,并调整为hive方言

CREATE CATALOG TestCatalog
WITH (
  'type' = 'hive',
  'default-database'='default',
  'hive-conf-dir'='/etc/hive/conf'
  'hive-version'='1.1.0'
);
USE CATALOG TestCatalog;
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'table';
SET 'table.sql-dialect'='hive';

五、启动Hive的metastore服务

由于Flink SQL是通过metastore服务, 来访问Hive的MySQL数据库管理元数据,所以首先需要启动Hive的metastore服务:

  • 对于CDH集群,如果安装完hive后,这个服务自然就启动起来了,无需重启;
  • 对于社区版Hive,执行如下命令启动
    nohup bin/hive --service metastore &

六、启动Flink集群

在集群的主节点上,通过执行如下脚本启动Flink集群服务:

bin/start-cluster.sh

七、根据初始化hive方言脚本启动FlinkSQL Client

bin/sql-client.sh -i conf/sql-init.sql

说明:
1: -i参数可以指定初始化脚本
2:sql-client-defaults.yml配置⽂件在 Flink1.13 及之后被移除,故此前根据这个配置文件来配置CATALOG的方式就行不通了,此时直接在自定义初始化文件(conf/sql-init.sql)中添加上述方言配置即可。

八、FlinkSQL 与 Hive集成测试

8.1 查询catalog

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
|       TestCatalog |
| default_catalog |
+-----------------+

8.2 使用catalog

Flink SQL> use catalog TestCatalog;
[INFO] Execute statement succeed.

8.3 查询库

Flink SQL> show databases;
+--------------------+
|      database name |
+--------------------+
|                ads |
|                dwd |
|                dws |
|                ods |
|               test |
+--------------------+

8.4 查询表集合

Flink SQL> show tables;
+----------------------------------------------+
|                                   table name |
+----------------------------------------------+
|              table_test                      |
+----------------------------------------------+

8.5 查询表内容

说明:表的完全限定符分别为 catalog名 + Hive库名 + Hive表名。

#查询表明细
Flink SQL> select * from TestCatalog.test.table_test;


# 执行聚合查询
Flink SQL> select count(*) from TestCatalog.test.table_test;

九、FlinkSQL 中的方言

Flink支持default 与 Hive 方言(SQL Dialects),默认是default。
如果要使用Hive的语法规范,需要先切换到hive方言:

# 1、显示声明使用Hive方言
Flink SQL>set table.sql-dialect=hive;
[INFO] Session property has been set.

Flink SQL>CREATE TABLE hive_dialect_test(`id` BIGINT,`name` STRING,`login_time` STRING, `location` STRING);
[INFO] Execute statement succeed.

# 2、插入测试数据

Flink SQL> insert into hive_dialect_test select 1111111111,'Andy','2022-05-01 08:37:23','NewYork';
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1b8b8ef262ac7f9d755b6f8f86f1afe4

# 3、查询表数据

在Hive客户端(如Hive shell 或 HUE下),查询FlinkSQL中创建的表及插入的数据。

#进入test数据库
hive>use test;
OK
Time taken: 0.997 seconds

hive> select * from hive_dialect_test;
OK
1111111111      Andy    2022-05-01 08:37:23     NewYork
Time taken: 0.246 seconds, Fetched: 1 row(s)

版权声明:本文为liuwei0376原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。