Databricks中PySpark的一些常用操作

关于PySpark API的详细可以参考: pyspark.sql.DataFrame.createOrReplaceTempView — PySpark 3.2.1 documentation

下面用在Databricks Notebook中的一些操作场景及实现作为例子:

1. Join操作

  • join多个stream并Partition输出
# event.request is delta table
# string to timestamp: to_timestamp()
# array size: size(brand_counts)
# partitionBy multiple columns: partitionBy(["publisher_id", "et"])
# select all columns of a join df: df_opp["*"]
df_opp = spark.sql("select request_id, publisher_id, event_datetime, date_format(event_datetime, 'yyyy-MM-dd-HH') as et, count, query, slot_id, request_target_values, request_target_value_counts, size(request_target_values) as req_target_val_cnt, brand_counts, size(brand_counts) as brand_cnt from event.request where dt >= '2022-03-23-00' and event_datetime >= to_timestamp('2022-03-23 00:00:00') and event_datetime < to_timestamp('2022-03-24 00:00:00')")
print(df_opp.count())
df_defimp = spark.read.option("mergeSchema", "true").parquet("/mnt/datalake/test_tom/deferred_impression")
print(df_defimp.count())
df_imp = spark.read.option("mergeSchema", "true").parquet("/mnt/datalake/test_tom/impression")
print(df_imp.count())
df_click = spark.read.option("mergeSchema", "true").parquet("/mnt/datalake/test_tom/click")
print(df_click.count())
df_join = df_opp.join(df_defimp, df_opp.request_id == df_defimp.request_id, 'left').join(df_imp, df_opp.request_id == df_imp.request_id, 'left').join(df_click, df_opp.request_id == df_click.request_id, 'left').select(df_opp["*"], df_defimp.def_imps, df_imp.imps, df_click.clicks)
print(df_join.count())
df_join.write.partitionBy(["publisher_id", "et"]).parquet("/mnt/datalake/test_tom/opp_join")
  • outer join合并key: coalesce + alias
compare_data = act_cat_data_week.join(pred_cat_data_week, (act_cat_data_week.cat_id == pred_cat_data_week.category_id) & (act_cat_data_week.week == pred_cat_data_week.week), 'outer').select(coalesce(act_cat_data_week.cat_id, pred_cat_data_week.category_id).alias("category_id"), coalesce(act_cat_data_week.week, pred_cat_data_week.week).alias("week"), act_cat_data_week.requests_act, pred_cat_data_week.requests_prediction)
compare_data = compare_data.fillna(0)
compare_data.show()
  • 填充指定列na值
camp_budget_perf=camp_budget_perf.fillna({"daily_spend": 0, "live_hours": 0, "weekly_spend": 0, "monthly_spend": 0, "spend": 0, "impressions": 0, "deferred_impressions": 0, "clicks": 0, "conversions": 0, "sales_volume": 0})

2. 多行进行不同类型的聚合

方式一:groupBy().agg(agg_func(column).alias(column_agg_name))

df = df.groupBy("publisher_id").agg(
  sum("count").alias("sum_count"), 
  sum("count_fill").alias("sum_count_fill"),
  sum("def_imps").alias("sum_def_imps"),
  sum("imps").alias("sum_imps"),
  sum("clicks").alias("sum_clicks"))

方式二:groupBy().agg({column_name:agg_func_name})

df = df.groupBy("publisher_id").agg({"et":"count", "count":"sum", "count_fill":"sum", "def_imps":"sum", "imps":"sum", "clicks":"sum"})

方式三:TempView + SQL

df = spark.read.option("mergeSchema", "true").parquet("/mnt/datalake/test_tom/opp_join")
df.createOrReplaceTempView("full_opp")
df1 = spark.sql("select publisher_id, count(*) as sum_req, sum(count) as sum_count, sum(count_fill) as sum_count_fill, sum(def_imps) as sum_def_imps, sum(imps) as sum_imps, sum(clicks) as sum_clicks from full_opp group by publisher_id")

多列聚合:

pred_cat_data_week = pred_cat_data_week.groupBy(["s_id", "cat_id", "week"]).agg(sum("requests_prediction").alias("requests_prediction"), sum("opportunities_prediction").alias("opportunities_prediction"), sum("viewable_opportunities").alias("impressions_prediction")).withColumnRenamed("cat_id", "category_id")

3. 列操作

- 常数生成列lit

from pyspark.sql.functions import *
target_date = datetime.strptime('2022-05-01', '%Y-%m-%d').date()
et_day = target_date.strftime('%Y-%m-%d')  
process_time = datetime.now().strftime('%Y-%m-%d-%H')
camp_budget_perf = camp_budget_perf.withColumn("dt", lit(et_day)).withColumn("pt", lit(process_time))

- 基于其他列的值生成新的列

PySpark withColumn() Usage with Examples - Spark by {Examples}

from pyspark.sql.functions import col
df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))

# another example
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]
columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df3 = df.withColumn("salary",col("salary")*100)
df5 = df.withColumn("Country", lit("USA"))

- 修改列名 (SQL:AS)

# per test if there is no column augmented_val in df it will ignore this statement
df.withColumnRenamed("augmented_val","aug_val")

- 找出空值的列

df.filter("state is NULL").show()
df.filter(df.state.isNull()).show()
df.filter(col("state").isNull()).show()

- 基于列值和常数比较过滤

from datetime import *
from pyspark.sql.functions import *

week_start = "2022-04-02"
week_end = '2022-05-22'
pred_data_week = pred_data.withColumn("week", date_trunc('day', pred_data['week'])).filter((col("week") >= datetime.strptime(week_start, '%Y-%m-%d').date()) & (col("week") < datetime.strptime(week_end, '%Y-%m-%d').date()))

- 基于条件赋值when

active_campaign_info = active_campaign_info.withColumn("interval_start", when(col("budget_interval") == "DAILY", lit(daily_interval_start)).when(col("budget_interval") == "WEEKLY", lit(weekly_interval_start)).when(col("budget_interval") == "MONTHLY", lit(monthly_interval_start)).otherwise(to_date(col("start_date")))).withColumn("interval_end", when(col("budget_interval") == "DAILY", lit(daily_interval_end)).when(col("budget_interval") == "WEEKLY", lit(weekly_interval_end)).when(col("budget_interval") == "MONTHLY", lit(monthly_interval_end)).otherwise(to_date(col("end_date"))))

- 列类型转换

campaign_life_perf = campaign_life_perf.withColumn("impressions", col("impressions").cast("integer"))


版权声明:本文为tanzhangwen原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。