spark hive 写入mysql_sparkSql将不同数据库数据写入hive

展示从Oracle与sqlServer数据写入到Hive中

在idea的resources文件夹放入配置文件:core-site.xml、hdfs-site.xml,hive-site.xml

hive-site.xml内容

hive.metastore.uris

thrift://dnode5:9083

hive.execution.engine

mr

代码

import org.apache.spark.SparkConf

import org.apache.spark.sql.{DataFrame, SparkSession}

object WriteToHive {

def main(args: Array[String]): Unit = {

//创建sparkConf

val sparkConf = new SparkConf().setAppName("Oracle_SqlSever_Hive").setMaster("local[*]")

//创建SparkSession

val spark = SparkSession.builder.config(sparkConf)

.enableHiveSupport//开启hive支持

.getOrCreate

//通过jdbc获取数据转化为DataFrame

val o_table = ReadOracle(spark)

val sqs_table = ReadSqlserver(spark)

//创建临时表

o_table.createOrReplaceTempView("v_account")

sqs_table.createOrReplaceTempView("v_record_copy")

val sql = "select " +

"vr.CardData carddata," +

"va.SNO pcode," +

"vr.PName pname," +

"vr.OccTime occtime," +

"vr.CodeIndex codeindex," +

"vr.PortNum portnum," +

"vr.EquptID equptid," +

"vr.EquptName equptname," +

"vr.LctnName lctnname," +

"date_format(occtime,'yyyy') occyear," +

"date_format(occtime,'MM') occmonth," +

"date_format(occtime,'dd') occday from v_record_copy vr " +

"left join v_account va on vr.CardData = va.CARDID " +

"order by vr.OccTime desc"

//通过sparkSql操作hive执行hive语句

spark.sql(sql).createOrReplaceTempView("v_table")

spark.sql("use sjkm")

spark.sql("select * from v_table").write.mode("overwrite").saveAsTable("view_record")

spark.stop()

}

//通过jdbc连接oracle

def ReadOracle(sparkSession: SparkSession): DataFrame = {

sparkSession.read.format("jdbc")

.option("url", "jdbc:oracle:thin:@ip:5521/服务名")

.option("dbtable", "库名.表名")

.option("user", "xxx")

.option("password", "xxxx")

.option("driver", "oracle.jdbc.OracleDriver")

.load()

}

//通过jdbc连接sqlserver

def ReadSqlserver(sparkSession: SparkSession): DataFrame = {

sparkSession.read.format("jdbc")

.option("url", "jdbc:sqlserver://ip:5521;databaseName=数据库名")

.option("dbtable", "模式名.表名")

.option("user", "xxxx")

.option("password", "xxxx")

.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")

.load()

}

}


版权声明:本文为weixin_36089954原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。