Spark执行Hive
提示:Spark执行Hive的表只能是外表或是表不包含ACID事物的表
前言
Hive一般作为大数据的数据仓库,因其语句和SQL大部分通用。所以很多数据为存储在Hive表中。
提示:以下是本篇文章正文内容,下面案例可供参考
一、pom.xml导入依赖执行的包
代码如下
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.13</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
二、使用步骤
1.编写代码
实现代码如下(示例):
SparkConf sc = new SparkConf();
sc.setAppName("Hive-Text"); //当前执行名称
//Spark执行Parquet格式Hive时候Decimal格式问题
sc.set("spark.sql.parquet.writeLegacyFormat","true");
SparkSession ss = SparkSession
.builder()
.config(sc)
.enableHiveSupport() //执行Hive操作
.getOrCreate();
//读取文件格式支持text/csv/json/parquet/orc
//其中text文件读取一行作为一个值,需要对象有多个字段,采用其他格式进行读取
Dataset<Row> frs = ss.read().text("input/data/config/test.sql");
List<String> sqls = new ArrayList<>();
//通过当前循环得到数据,row.length和文件格式有关
frs.toLocalIterator().forEachRemaining(new Consumer<Row>() {
@Override
public void accept(Row row) {
int len = row.length();
for(int i = 0; i < len; i++){
//文件内容处理
sqls.add(row.getString(i));
}
}
});
for (String sql : sqls){
//执行SQL
Dataset<Row> dataset = ss.sql(sql);
}
2.Spark执行脚本
脚本编写需要注意格式,最好从Linux下载可执行sh脚本进行修改
脚本实例如下
spark-submit \
--master yarn \
--executor-memory 3072m \
--num-executors 3\
--class com.study.SparkHiveFile \
/user/home/test/jar/spark.jar ${hdfs_sql_file} ${data_date}
异常处理
1:当执行Hive查询中包含自定义函数,函数中包含Hive查询,包数据库连接异常
处理方法是将自定义函数在Spark项目上复制一份。因当前项目中的类会覆盖在Hive 中创建自定义函数实现类。且数据库连接正常
2:当执行报异常:Should not be called directly时
需要重写下面方法:因Spark执行的是下面方法,即在Hive自定义函数时实现下面方法。
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
throw new IllegalStateException("Should not be called directly");
}
3:当执行报:This table may be a Hive-managed ACID
需要注意Spark执行Hive的表不能是内表(即表由ACID事物属性的表)。可以采用外表进行替代。
版权声明:本文为swg321321原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。