案例一:连续活跃用户案例
#SQL风格
import org.apache.spark.sql.catalyst.plans.logical.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
//连续活跃用户案例
//求连续登录天数大于或等于两天的用户记录
object ContenueActiveUser_SQL {
def main(args: Array[String]): Unit = {
//获取session
val session: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("")
.getOrCreate()
//纯SQL进行查询
val df: DataFrame = session
.read
.option("header", "true")
.csv("Files/data1.txt")
// df.show()
df.createTempView("view_log")
/**
* 一、SQL风格的写法
*/
val df2: DataFrame = session.sql(
"""
|
|select
|uid,
|min(dt) as min_dt,
|max(dt) as max_dt,
|count(date_diff) as times
|from
|(select
|uid,
|dt,
|date_sub(dt,dt_num) as date_diff
| from
| (
| select
| uid,
| dt,
| row_number() over(partition by uid order by dt asc) as dt_num
| from
| (
| select
| distinct(uid,dt),uid,dt
| from view_log
| )t1
| )t2)
| group by uid,date_diff having times>=3
|""".stripMargin)
// df2.show()
// session.stop()
}
}
#DSL风格
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
//连续活跃用户案例
//求连续登录天数大于或等于两天的用户记录
object ContenueActiveUser_DSL {
def main(args: Array[String]): Unit = {
//获取session
val session: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("")
.getOrCreate()
//纯sql进行查询数据
val df: DataFrame = session
.read
.option("header", "true")
.csv("data1.txt")
//注意:是命名为session 不是spark
import session.implicits._
import org.apache.spark.sql.functions._
/**
* 二、DSL风格写法
*
*/
df.distinct().
select('uid,'dt,
(row_number() over (Window.partitionBy("uid").orderBy("dt"))) as 'rn
)
.select(
'uid,
'dt,
date_sub('uid,'dt) as'date_diff
).groupBy('uid, 'date_diff)
//假如要多个聚合时,使用agg
.agg(
min("dt"),
max("dt"),
count("*") as "times"
).where('times >= 2)
.drop("date_diff")
.show()
}
}案例二:店铺每月累计案例
#SQL风格
import org.apache.spark.sql.{DataFrame, SparkSession}
//店铺每月累计案例
object ShopMonthAdd_SQL {
def main(args: Array[String]): Unit = {
//1.获取session
val session: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("")
.getOrCreate()
/**
* 计算店铺的与销售额和累加到当前月的销售和
* 销售额
* 和
* 累加到当前月的的 销售和
*/
//2.纯SQL进行查询
val df: DataFrame = session
.read
.option("header", "true")
.csv("Files/data2.txt")
//3.创建临时视图
df.createTempView("view_shop")
//4.SQL风格进行查询 naem time money
session.sql(
"""
|select
|sid,
|mth,
|sum(mth_money) over(partition by sid order by mth) as total_money
|from
|(
|select
|sid,
|mth,
|sum(money) as mth_money
|from
|(
|select
|sid,
|date_format(dt,"yyyy-MM") as mth,
|cast(money as double) as money
|from view_shop
|) t1 group by sid,mth) t2
|
|""".stripMargin).show()
session.stop()
}
}#DSL风格
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{DataFrame, SparkSession}
//店铺每月累计案例
object ShopMonthAdd_DSL {
def main(args: Array[String]): Unit = {
//1.获取session
val session: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("")
.getOrCreate()
/**
* 计算店铺的与销售额和累加到当前月的销售和
* 销售额
* 和
* 累加到当前月的的 销售和
*/
//2.纯SQL进行查询
val df: DataFrame = session
.read
.option("header", "true")
.csv("Files/data2.txt")
/**
* DSL风格实现
*/
import session.implicits._
import org.apache.spark.sql.functions._
df.select($"sid",
'money.cast(DataTypes.DoubleType) as "money",
expr("date_format(dt, 'yyyy-MM') as mth")
).groupBy("sid", "mth").
sum("money")
.withColumnRenamed("sum(money)", "mth_money")
.select(
$"sid",
$"mth",
sum("mth_money").over(Window.partitionBy("sid")
.orderBy("mth")) as "total_money"
).show()
}
}
案例三:流量统计
#SQL风格
import org.apache.spark.sql.{DataFrame, SparkSession}
//流量统计
object FlowTotal_SQL {
def main(args: Array[String]): Unit = {
//1.获取session
val session: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("")
.getOrCreate()
//2.纯SQL进行查询
val df: DataFrame = session
.read
.option("header", "true")
.csv("Files/data3.txt")
//3.创建临时视图
df.createTempView("view_flow")
/**
* 实现条件
* 1.id字段
* 2.当同一个id的上一个end_time减去下一个begin_time>=10 min
* 再分一组;最终得到down_flow的总值和
*
* 数据展示
* uid,start_dt,end_dt,flow
* 1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
*/
//4.SQL风格进行查询
session.sql(
"""
|select
|uid,
|min(start_dt) as start_dt,
|max(end_dt) as end_dt,
|sum(flow) as flow
|from
|(
|select
|uid,
|start_dt,
|end_dt,
|sum(lag_num) over(partition by uid order by start_dt) as flag,
|flow
|from
|(
|select
|uid,
|start_dt,
|end_dt,
|if((to_unix_timestamp(start_dt)-to_unix_timestamp(lag_time))/60>10,1,0) as lag_num,
|flow
|from
|(
|select
|uid,
|start_dt,
|end_dt,
|flow,
|lag(end_dt,1,start_dt) over(partition by uid order by start_dt) as lag_time
|from view_flow
|)t1 )t2 )t3 group by uid,flag
|""".stripMargin).show()
session.stop()
}
}
#DSL风格
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{DataFrame, SparkSession}
//流量统计
object FlowTotal_DSL {
def main(args: Array[String]): Unit = {
//1.获取session
val session: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("")
.getOrCreate()
//2.纯SQL进行查询
val df: DataFrame = session
.read
.option("header", "true")
.csv("Files/data3.txt")
/**
* DSL风格编程
*/
import session.implicits._
import org.apache.spark.sql.functions._
df.select($"sid",
'money.cast(DataTypes.DoubleType) as "money",
expr("date_format(dt, 'yyyy-MM') as mth")
).groupBy("sid", "mth").
sum("money")
.withColumnRenamed("sum(money)", "mth_money")
.select(
$"sid",
$"mth",
sum("mth_money").over(Window.partitionBy("sid")
.orderBy("mth")) as "total_money"
).show()
}
}
版权声明:本文为Dream_aju原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。