一、准备数据
因为数据目前在本地,所以先将数据上传至Hive,再进行清洗。
上传步骤:
1、将数据上传至虚拟机中:使用rz -E
选择文件
2、进入hive,选择数据库,并进行建表
注意:建表的时候,要保证表的分隔符要和数据本身的分隔符相同,否则导入数据的时候会出现所有数据都插入到同一列。
create table data(
TRIP_ID string,CALL_TYPE string,
ORIGIN_CALL string,ORIGIN_STAND string,
TAXI_ID string,TIME_STAMP string,
DAY_TYPE string,MISSING_DATA string,POLYLINE array<string>)
row format delimited fields terminated by '\t';
3、将刚刚上传的数据导入创建的表中
load data local inpath '/data/data.txt' into table data;
4、导入成功之后,可以查询验证一下数据是否完整。
select * from data;
二、编写Spark代码
package 数据清洗
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Properties
//TODO 此代码是最基本的提取Hive数仓的数据清洗之后并存入Mysql步骤
object Q1_test {
def main(args: Array[String]): Unit = {
//TODO 配置SparkSql参数
val sparkConf=new SparkConf().setAppName("name").setMaster("local[*]")
val sc=new SparkContext(sparkConf)
val spark=SparkSession.builder().config("hive.metastore.uris","thrift://192.168.38.144:9083")
.config(sparkConf).enableHiveSupport().getOrCreate()
// spark.sql("use ssm")
//TODO 提取数据
val sourcedata=spark.sql("select time_stamp from test.data")
//test.data 表示test为数据库的名字,data为表名
sourcedata.createTempView("timestamp")
//使用HQL语句清洗数据
val result=spark.sql("select from_unixtime(regexp_extract(time_stamp,'([0-9]+)',0))as times from timestamp order by times desc limit 10")
//TODO 配置Mysql并写入
val prop=new Properties()
prop.put("user","root")
prop.put("password","123456")
val url="jdbc:mysql://192.168.38.160:3306/test"
result.write.mode("overwrite").jdbc(url,"result2",prop)
spark.stop()
}
}
代码运行成功之后就可以进入mysql查询数据了!
注意事项:
Spark连接Hive之前,需要执行提前打开hiveserver的端口:
在虚拟机输入命令:hive --service metastore
此命令为堵塞进程,打开之后复制会话窗口执行后续操作。
版权声明:本文为weixin_53299145原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。