目标: 实现类似于navicat的功能=> 写hql语句,在idea下使用spark sql 一键运行,而不用到shell窗口下运行命令
步骤: 写sql文件 (resources目录)—> 读取内容 --> 以 ‘;’ 解析每条命令 -->sparksql 运行命令
1, 写hql文件
gen_startup.sql 内容如下:
use db ;
set hive.exec.dynamic.partition.mode=nonstrict ;
--定义函数
drop function if exists forkstartuplogs ;
drop function if exists formatbyday ;
create function forkstartuplogs as '包名.ForkStartuplogsUDTF' ;
create function formatbyday as '包名.FormatByDayUDF' ;
--使用函数
insert into appstartuplogs partition(ym , day)
select
t.appChannel ,
t.appId ,
t.appPlatform ,
t.appVersion ,
t.brand ,
formatbyday(t.createdatms , 0 , 'yyyyMM') ,
formatbyday(t.createdatms , 0 , 'dd')
from
(
select forkstartuplogs(servertimestr ,clienttimems ,clientip ,json) from raw_logs where ym='201511' and day='8'
)t;
2,配置maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.17</version>
</dependency>
2,创建SparkSession
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public static String readSQLAsString(String resource) throws Exception {
InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ;
ByteArrayOutputStream baos = new ByteArrayOutputStream() ;
byte[] buf = new byte[1024] ;
int len = -1 ;
while((len = input.read(buf)) != -1){
baos.write(buf , 0 , len);
}
String sql = new String(baos.toByteArray(), Charset.defaultCharset()) ;
//替换掉注释
sql = sql.replaceAll("--.*\r\n", "") ;
return sql ;
}
public static void main(String[] args) throws Exception {
//spark配置
SparkConf conf = new SparkConf();
conf.setAppName("gen_startupLog") ;
conf.setMaster("local[4]") ;
//spark sql会话
SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
String sqls = ResourceUtil.readSQLAsString("gen_startup.sql");
String arr[] = sqls.split(";");
for (String sql : arr) {
if (!sql.trim().equals("")) {
sess.sql(sql).show(10, false);
}
}
}
版权声明:本文为eyeofeagle原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。