SparkSQL 经典案例(SQL&DSL)风格详解

案例一:连续活跃用户案例

#SQL风格

import org.apache.spark.sql.catalyst.plans.logical.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
//连续活跃用户案例
//求连续登录天数大于或等于两天的用户记录
object ContenueActiveUser_SQL {
    def main(args: Array[String]): Unit = {
        //获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //纯SQL进行查询
        val df: DataFrame = session
          .read
          .option("header", "true")
          .csv("Files/data1.txt")

//        df.show()
        df.createTempView("view_log")

        /**
         * 一、SQL风格的写法
         */
        val df2: DataFrame = session.sql(
            """
              |
              |select
              |uid,
              |min(dt) as min_dt,
              |max(dt) as max_dt,
              |count(date_diff) as times
              |from
              |(select
              |uid,
              |dt,
              |date_sub(dt,dt_num) as date_diff
              | from
              |  (
              |         select
              |         uid,
              |         dt,
              |         row_number() over(partition by uid order by dt asc) as dt_num
              |         from
              |             (
              |             select
              |         distinct(uid,dt),uid,dt
              |         from view_log
              |         )t1
              | )t2)
              | group by uid,date_diff having times>=3
              |""".stripMargin)
//        df2.show()
//        session.stop()
    }
}

 #DSL风格

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}

//连续活跃用户案例
//求连续登录天数大于或等于两天的用户记录
object ContenueActiveUser_DSL {
    def main(args: Array[String]): Unit = {

        //获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //纯sql进行查询数据
        val df: DataFrame = session
          .read
          .option("header", "true")
          .csv("data1.txt")

        //注意:是命名为session 不是spark
        import session.implicits._
        import org.apache.spark.sql.functions._

        /**
         * 二、DSL风格写法
         *
         */
        df.distinct().
          select('uid,'dt,
              (row_number() over (Window.partitionBy("uid").orderBy("dt"))) as 'rn
          )
          .select(
              'uid,
              'dt,
              date_sub('uid,'dt) as'date_diff
          ).groupBy('uid, 'date_diff)
          //假如要多个聚合时,使用agg
          .agg(
              min("dt"),
              max("dt"),
              count("*") as "times"
          ).where('times >= 2)
          .drop("date_diff")
          .show()
    }
}

案例二:店铺每月累计案例

#SQL风格

import org.apache.spark.sql.{DataFrame, SparkSession}
//店铺每月累计案例

object ShopMonthAdd_SQL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        /**
         * 计算店铺的与销售额和累加到当前月的销售和
         * 销售额
         * 和
         * 累加到当前月的的 销售和
         */

        //2.纯SQL进行查询
        val df: DataFrame = session
          .read
          .option("header", "true")
          .csv("Files/data2.txt")

        //3.创建临时视图
        df.createTempView("view_shop")

        //4.SQL风格进行查询 naem time money
        session.sql(
            """
              |select
              |sid,
              |mth,
              |sum(mth_money) over(partition by sid order by mth) as total_money
              |from
              |(
              |select
              |sid,
              |mth,
              |sum(money) as mth_money
              |from
              |(
              |select
              |sid,
              |date_format(dt,"yyyy-MM") as mth,
              |cast(money as double) as money
              |from view_shop
              |) t1 group by sid,mth) t2
              |
              |""".stripMargin).show()
        session.stop()
    }
}

#DSL风格

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{DataFrame, SparkSession}

//店铺每月累计案例

object ShopMonthAdd_DSL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        /**
         * 计算店铺的与销售额和累加到当前月的销售和
         * 销售额
         * 和
         * 累加到当前月的的 销售和
         */

        //2.纯SQL进行查询
        val df: DataFrame = session
          .read
          .option("header", "true")
          .csv("Files/data2.txt")

        /**
         * DSL风格实现
         */
        import session.implicits._
        import org.apache.spark.sql.functions._

        df.select($"sid",
            'money.cast(DataTypes.DoubleType) as "money",
            expr("date_format(dt, 'yyyy-MM') as mth")
        ).groupBy("sid", "mth").
          sum("money")
          .withColumnRenamed("sum(money)", "mth_money")
          .select(
              $"sid",
              $"mth",
              sum("mth_money").over(Window.partitionBy("sid")
                .orderBy("mth")) as "total_money"
          ).show()
    }
}

案例三:流量统计

#SQL风格

import org.apache.spark.sql.{DataFrame, SparkSession}
//流量统计
object FlowTotal_SQL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //2.纯SQL进行查询
        val df: DataFrame = session
          .read
          .option("header", "true")
          .csv("Files/data3.txt")

        //3.创建临时视图
        df.createTempView("view_flow")

        /**
         * 实现条件
         * 1.id字段
         * 2.当同一个id的上一个end_time减去下一个begin_time>=10 min
         * 再分一组;最终得到down_flow的总值和
         *
         * 数据展示
         * uid,start_dt,end_dt,flow
         * 1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
         */

        //4.SQL风格进行查询
        session.sql(
            """
              |select
              |uid,
              |min(start_dt) as start_dt,
              |max(end_dt) as end_dt,
              |sum(flow) as flow
              |from
              |(
              |select
              |uid,
              |start_dt,
              |end_dt,
              |sum(lag_num) over(partition by uid order by start_dt) as flag,
              |flow
              |from
              |(
              |select
              |uid,
              |start_dt,
              |end_dt,
              |if((to_unix_timestamp(start_dt)-to_unix_timestamp(lag_time))/60>10,1,0) as lag_num,
              |flow
              |from
              |(
              |select
              |uid,
              |start_dt,
              |end_dt,
              |flow,
              |lag(end_dt,1,start_dt) over(partition by uid order by start_dt) as lag_time
              |from view_flow
              |)t1 )t2 )t3 group by uid,flag
              |""".stripMargin).show()

        session.stop()
    }
}

#DSL风格

 

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.{DataFrame, SparkSession}

//流量统计
object FlowTotal_DSL {
    def main(args: Array[String]): Unit = {
        //1.获取session
        val session: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("")
          .getOrCreate()

        //2.纯SQL进行查询
        val df: DataFrame = session
          .read
          .option("header", "true")
          .csv("Files/data3.txt")

        /**
         * DSL风格编程
         */
        import session.implicits._
        import org.apache.spark.sql.functions._
        df.select($"sid",
            'money.cast(DataTypes.DoubleType) as "money",
            expr("date_format(dt, 'yyyy-MM') as mth")
        ).groupBy("sid", "mth").
          sum("money")
          .withColumnRenamed("sum(money)", "mth_money")
          .select(
              $"sid",
              $"mth",
              sum("mth_money").over(Window.partitionBy("sid")
                .orderBy("mth")) as "total_money"
          ).show()
    }
}


版权声明:本文为Dream_aju原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。