1.安装hdfs
2.安装pyspark(要与大数据平台spark版本保持一致,大数据平台我搭建的是cdh6,他的spark是2.4)
操作hive
# -*- coding: utf-8 -*-
# @Time : 2022/8/18 11:13
# @Author : tianyunzqs
# @Description :
import argparse
from pyspark.sql import SparkSession
from pyspark import SparkContext
def insert_data_to_hive(hdfs_path, hive_table):
sc = SparkContext(appName="Lantu-model").getOrCreate()
spark = SparkSession.builder.master("spark://spark主节点:7077") \
.config("hive.metastore.uris", "thrift://hive主节点:9083") \
.appName("test") \
.enableHiveSupport() \
.getOrCreate()
df = spark.read.format('csv').option("sep", ",")\
.options(header='true', inferschema='true')\
.load(hdfs_path)
df.write.format("hive").mode("overwrite").saveAsTable(hive_table)
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser(description="build component")
arg_parser.add_argument('--hdfs_path', type=str, required=True, help="HDFS结果路径", default="")
arg_parser.add_argument('--hive_table', type=str, required=True, help="HIVE结果表", default="")
args = arg_parser.parse_args()
insert_data_to_hive(args.hdfs_path, args.hive_table)
操作kafka
from pyspark import SparkContext
from pyspark.sql import HiveContext, SparkSession
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
BROKER_LIST=['dxh-d-cdh2-dnode-0010:9092']
sc = SparkContext(appName="Car-consumer")
sc.setLogLevel("WARN")
spark = SparkSession.builder.master("spark://dxh-d-cdh2-dnode-0010:7077") \
.config("hive.metastore.uris", "thrift://dxh-d-cdh2-dnode-0010:9083") \
.appName("test") \
.enableHiveSupport()\
.getOrCreate()
#
# def hiveInsert(name,pageSize):
# spark.sql("insert into default.test_streaming select value, time_stamp from updates")
final_result = []
def recommend_single_user(data):
print("begin handle message")
data = eval(data)
name = data.get('name')
final_result.append(name)
pageSize = data.get('pageSize')
def deal_data(rec):
data_rdd = rec.values().filter(lambda r: len(r) > 0)
if data_rdd.isEmpty():
print("get user message form spark end, the user size is 0")
return
data_frame=spark.createDataFrame([("a",1),("b",2)],schema=["name","age"])
data_frame.write.format("hive").mode("append").saveAsTable('default.lantudata45')
print("keep message")
# schema = StructType([
# StructField("name", StringType(), True),
# StructField("pageSize", IntegerType(), True)])
data_frame=spark.read.json(data_rdd)
data_frame.write.format("hive").mode("append").saveAsTable('default.lantudata')
def save_by_spark_direct_stream():
'''
Spark Streaming read Kafka,createDirectStream ,no kafka group id ,no storing offset, no ZK
'''
data_frame=spark.createDataFrame([("a",1),("b",2)],schema=["name","age"])
data_frame.write.format("hive").mode("append").saveAsTable('default.lantudata45')
ssc = StreamingContext(sc, int(1))
kvs = KafkaUtils.createDirectStream(ssc=ssc,
topics=["lantu-original-data9"],
fromOffsets={},
kafkaParams={
"auto.offset.reset":"smallest",
"group.id":"lantu_consumer",
"metadata.broker.list": ",".join(BROKER_LIST)}) # 容器云未开启SSL认证,pyspark不支持streaming kafka认证
kvs.foreachRDD(lambda rec: deal_data(rec))
'''
The Start!
'''
ssc.start()
ssc.awaitTermination()
ssc.stop()
save_by_spark_direct_stream()
版权声明:本文为qq_38403590原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。