spark sql: SparkSession操作hive表

目标: 实现类似于navicat的功能=> 写hql语句,在idea下使用spark sql 一键运行,而不用到shell窗口下运行命令

步骤: 写sql文件 (resources目录)—> 读取内容 --> 以 ‘;’ 解析每条命令 -->sparksql 运行命令

1, 写hql文件

gen_startup.sql 内容如下:

use db ;
set hive.exec.dynamic.partition.mode=nonstrict ;

--定义函数
drop function if exists forkstartuplogs ;
drop function if exists  formatbyday ;
create   function forkstartuplogs as '包名.ForkStartuplogsUDTF' ;
create   function formatbyday 	  as '包名.FormatByDayUDF' ;

--使用函数
insert into appstartuplogs partition(ym , day)
select
  t.appChannel ,
  t.appId ,
  t.appPlatform ,
  t.appVersion ,
  t.brand ,
  formatbyday(t.createdatms , 0 , 'yyyyMM') ,
  formatbyday(t.createdatms , 0 , 'dd')
from
 (
    select forkstartuplogs(servertimestr ,clienttimems ,clientip ,json) from  raw_logs where ym='201511' and day='8'
  )t;

2,配置maven依赖

	 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.17</version>
        </dependency>

2,创建SparkSession

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

public static String readSQLAsString(String resource) throws Exception {
	InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ;
    ByteArrayOutputStream baos = new ByteArrayOutputStream() ;

	byte[] buf = new byte[1024] ;
	int len = -1 ;
	while((len = input.read(buf)) != -1){
		baos.write(buf , 0 , len);
	}
	String sql = new String(baos.toByteArray(), Charset.defaultCharset()) ;
	//替换掉注释
	sql = sql.replaceAll("--.*\r\n", "") ;
	return sql ;
}
	
public static void main(String[] args) throws Exception {
	//spark配置
	SparkConf conf = new SparkConf();
	conf.setAppName("gen_startupLog") ;
	conf.setMaster("local[4]") ;
	
	//spark sql会话
	SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
	String sqls = ResourceUtil.readSQLAsString("gen_startup.sql");
	String arr[] = sqls.split(";");
	for (String sql : arr) {
		if (!sql.trim().equals("")) {
			sess.sql(sql).show(10, false);
		}
	}		
}

版权声明:本文为eyeofeagle原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。