flink1.11.2连接hive数据库

#flink连接hive并读取数据

前提条件

已经有Hadoop和hive的环境
flink版本1.11.2

设置hive-site.xml

#修改hive-site.xml文件如下,此文件不是修改hive安装目录下面的文件而是你项目启动的环境
直接拷贝hive下面的过来修改了一样的

<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.120.130:9083</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>
hive.metastore.uris为hive的连接地址
hive.metastore.warehouse.dir为hive连接仓库

读取hive的代码

package com.example.demo.flinkKafka;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
public class test {


    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // BatchTableEnvironment tableEnv = new BatchTableEnvironment();
        //tableEnv.registerFunction("mapToString", new MapToString());
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt"; // a local path
        String version = "3.1.2";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);
        tEnv.registerCatalog("myhive", hive);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.useDatabase("test");
        Table table = tEnv.sqlQuery("select *  from resource_item where date_no ='20201130' and id=9931" );
        DataSet<Row> rowDataSet = tEnv.toDataSet(table, Row.class);
        rowDataSet.print();
    }
}

需要的包

  <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-java</artifactId>
       <version>1.11.1</version>
   </dependency>
  <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>
  <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.11</artifactId>
            <version>1.11.2</version>

        </dependency>
 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.11.1</version>
        </dependency>

版权声明:本文为weixin_43943806原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。