pyspark操作hive,操作kafka

1.安装hdfs

2.安装pyspark(要与大数据平台spark版本保持一致,大数据平台我搭建的是cdh6,他的spark是2.4)

操作hive

# -*- coding: utf-8 -*-
# @Time        : 2022/8/18 11:13
# @Author      : tianyunzqs
# @Description :

import argparse
from pyspark.sql import SparkSession
from pyspark import SparkContext


def insert_data_to_hive(hdfs_path, hive_table):
    sc = SparkContext(appName="Lantu-model").getOrCreate()
    spark = SparkSession.builder.master("spark://spark主节点:7077") \
        .config("hive.metastore.uris", "thrift://hive主节点:9083") \
        .appName("test") \
        .enableHiveSupport() \
        .getOrCreate()

    df = spark.read.format('csv').option("sep", ",")\
        .options(header='true', inferschema='true')\
        .load(hdfs_path)
    df.write.format("hive").mode("overwrite").saveAsTable(hive_table)


if __name__ == '__main__':
    arg_parser = argparse.ArgumentParser(description="build component")
    arg_parser.add_argument('--hdfs_path', type=str, required=True, help="HDFS结果路径", default="")
    arg_parser.add_argument('--hive_table', type=str, required=True, help="HIVE结果表", default="")
    args = arg_parser.parse_args()
    insert_data_to_hive(args.hdfs_path, args.hive_table)

操作kafka

from pyspark import SparkContext
from pyspark.sql import HiveContext, SparkSession
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
BROKER_LIST=['dxh-d-cdh2-dnode-0010:9092']


sc = SparkContext(appName="Car-consumer")
sc.setLogLevel("WARN")
spark = SparkSession.builder.master("spark://dxh-d-cdh2-dnode-0010:7077") \
    .config("hive.metastore.uris", "thrift://dxh-d-cdh2-dnode-0010:9083") \
    .appName("test") \
    .enableHiveSupport()\
    .getOrCreate()



#
# def hiveInsert(name,pageSize):
#     spark.sql("insert into default.test_streaming select value, time_stamp from updates")
final_result = []
def recommend_single_user(data):
    print("begin handle message")
    data = eval(data)
    name = data.get('name')
    final_result.append(name)
    pageSize = data.get('pageSize')

def deal_data(rec):

    data_rdd = rec.values().filter(lambda r: len(r) > 0)
    if data_rdd.isEmpty():
        print("get user message form spark end, the user size is 0")
        return
    data_frame=spark.createDataFrame([("a",1),("b",2)],schema=["name","age"])
    data_frame.write.format("hive").mode("append").saveAsTable('default.lantudata45')
    print("keep message")
    # schema = StructType([
    #     StructField("name", StringType(), True),
    #     StructField("pageSize", IntegerType(), True)])
    data_frame=spark.read.json(data_rdd)
    data_frame.write.format("hive").mode("append").saveAsTable('default.lantudata')

def save_by_spark_direct_stream():
    '''
    Spark Streaming read Kafka,createDirectStream ,no kafka group id ,no storing offset, no ZK
    '''
    data_frame=spark.createDataFrame([("a",1),("b",2)],schema=["name","age"])
    data_frame.write.format("hive").mode("append").saveAsTable('default.lantudata45')
    ssc = StreamingContext(sc, int(1))

    kvs = KafkaUtils.createDirectStream(ssc=ssc,
                                        topics=["lantu-original-data9"],
                                        fromOffsets={},
                                        kafkaParams={
                                            "auto.offset.reset":"smallest",
                                            "group.id":"lantu_consumer",
                                            "metadata.broker.list": ",".join(BROKER_LIST)})  # 容器云未开启SSL认证,pyspark不支持streaming kafka认证

    kvs.foreachRDD(lambda rec: deal_data(rec))

    '''
    The Start!
    '''
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
save_by_spark_direct_stream()


版权声明:本文为qq_38403590原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。