案例实操:
1.数据准备
我们这次 Spark-sql 操作中所有的数据均来自 Hive,首先在 Hive 中创建表,,并导入数据。一共有 3 张表: 1 张用户行为表,1 张城市表,1 张产品表。
CREATE TABLE `user_visit_action`(
`date` string,
`user_id` bigint,
`session_id` string,
`page_id` bigint,
`action_time` string,
`search_keyword` string,
`click_category_id` bigint,
`click_product_id` bigint,
`order_category_ids` string,
`order_product_ids` string,
`pay_category_ids` string,
`pay_product_ids` string,
` city_id ` bigint)
row format delimited fields terminated by '\t';
load data local inpath 'input/user_visit_action.txt' into table
user_visit_action;
CREATE TABLE `product_info`(
`product_id` bigint,
`product_name` string,
`extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/product_info.txt' into table product_info;
CREATE TABLE `city_info`(
`city_id` bigint,
`city_name` string,
`area` string)
row format delimited fields terminated by '\t';
load data local inpath 'input/city_info.txt' into table city_info;
2.需求:各区域热门商品 Top3
2.1需求简介
这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每
个商品在主要城市中的分布比例,超过两个城市用其他显示。

2.2需求分析
➢ 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与Product_info 表连接得到产品名称。
➢ 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数。
➢ 每个地区内按照点击次数降序排列。
➢ 只取前三名。
➢ 城市备注需要自定义 UDAF 函数。
2.3 功能实现
➢ 连接三张表的数据,获取完整的数据(只有点击)。
➢ 将数据根据地区,商品名称分组。
➢ 统计商品点击次数总和,取 Top3。
➢ 实现自定义聚合函数显示备注。
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, TypedColumn, functions}
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
object Spark07_SparkSQL_Test1 {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("use atguigu")
spark.sql(
"""
|select
| a.*,
| p.product_name,
| c.area,
| c.city_name
| from user_visit_action a
| join product_info p on a.click_product_id = p.product_id
| join city_info c on a.city_id = c.city_id
| where a.click_product_id > -1
""".stripMargin).createOrReplaceTempView("t1")
spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
spark.sql(
"""
|select
| area,
| product_name,
| count(*) as clickCnt,
| cityRemark(city_name) as city_remark
|t1 group by area, product_name
""".stripMargin).createOrReplaceTempView("t2")
spark.sql(
"""
|select
| *,
| rank() over( partition by area order by clickCnt desc ) as rank
|from t2
""".stripMargin).createOrReplaceTempView("t3")
spark.sql(
"""
|select
| *
|from t3 where rank <= 3
""".stripMargin).show(false)
spark.close()
}
case class Buffer(var total : Long, var cityMap:mutable.Map[String,Long] )
class CityRemarkUDAF extends Aggregator[String, Buffer, String]{
override def zero: Buffer = {
Buffer(0, mutable.Map[String, Long]())
}
override def reduce(buff: Buffer, city: String): Buffer = {
buff.total += 1
val newCount = buff.cityMap.getOrElse(city,0L) + 1
buff.cityMap.update(city, newCount)
buff
}
override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
buff1.total += buff2.total
val map1 = buff1.cityMap
val map2 = buff2.cityMap
// buff1.cityMap = map1.foldLeft(map2) {
// case (map, (city, cnt)) => {
// val newCount = map.getOrElse(city, 0L) + cnt
// map.update(city, newCount)
// map
// }
// }
map2.foreach{
case (city, cnt) => {
val newCount = map1.getOrElse(city, 0L) + cnt
map1.update(city,newCount)
}
}
buff1.cityMap = map1
buff1
}
override def finish(buff: Buffer): String = {
val remarkList = ListBuffer[String]()
val totalcnt = buff.total
val cityMap = buff.cityMap
val cityCntList = cityMap.toList.sortWith(
(left, right) => {
left._2 > right._2
}
).take(2)
val hasMore = cityMap.size > 2
var rsum = 0L
cityCntList.foreach{
case (city, cnt) => {
val r = cnt * 100 / totalcnt
remarkList.append(s"${city} ${r}%")
rsum += r
}
}
if (hasMore){
remarkList.append(s"其他 ${100 - rsum}%")
}
remarkList.mkString(",")
}
override def bufferEncoder: Encoder[Buffer] = Encoders.product
override def outputEncoder: Encoder[String] = Encoders.STRING
}
}