一、关系建模与维度建模
首先,理清数据处理方式。
当今的数据处理大致可以分成两大类:联机事务处理OLTP(on-line transaction processing)、联机分析处理OLAP(On-Line Analytical Processing)。
① OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易。
② OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。
关系模型主要应用于OLTP系统中,比较的松散、零碎,物理表数量多,而数据冗余程度低,所以,在跨表分析统计查询过程中,会造成多表关联join,这会大大地降低执行效率。
维度模型主要应用于OLAP系统中,通常以某一个事实表为中心进行表的组织,主要面向业务,特征是可能存在数据的冗余,但是能方便地得到数据。
二、维度表和事实表
① 维度表:每一张维表对应现实世界中的一个对象或者概念。
例如:用户、商品、日期、地区等。
特点:属性多,即列比较多
如时间维度表可包含哪天、哪月、哪年、哪个季度、是否节假日等等。
② 事实表中的每行数据代表一个业务事件(下单、支付、退款、评价等)。“事实”这个术语表示的是业务事件的度量值(可统计次数、个数、金额等),例如,订单事件中的下单金额。
每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键、通常具有两个和两个以上的外键、外键之间表示维表之间多对多的关系。
特点:列数少,经常发生变化,每天会新增加很多
三、离线数仓
ods - dwd - dws - ads
① ODS层
保持数据原貌,备份
text数据格式,lzo文件压缩,减少磁盘存储量
按每天的日期进行分区,避免后续查询全盘扫描
② DWD层
数据清洗,进行了维度建模,指定存储为parquet数据格式,方便频繁查询。
采用星型模型的维度建模,一般步骤
选择业务过程→声明粒度→确认维度→确认事实
第一步,选择感兴趣的业务,因为业务线较多,如下单业务,支付业务,退款业务,物流业务,一条业务线对应一张事实表。
第二步,声明粒度,即确定保存数据的细化程度,也就是精确定义事实表中的一行数据表示什么,如
订单详情表中,每行数据对应一个订单中的一个商品项,
订单表中,每行数据对应一个订单。
第三步,确定维度
维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。
第四步,确定事实
此处的“事实”一词,指的是业务中的度量值,例如订单金额、下单次数等。
③ DWS层
即统计各个主题对象的当天行为。
④ ADS层
存储分析指标结果。
四、导入离线数据
1、ODS层
日志数据:1张表
业务数据:23张表
(1)离线日志数据:
① 创建外部表EXTERNAL TABLE
只创建一张表ods_log,按日期分区,指定输入和输出的lzo格式,每天数据都是表中的一行Json数据。
hive (cloudmall)>
drop table if exists ods_log;
CREATE EXTERNAL TABLE ods_log (`line` string)
PARTITIONED BY (`dt` string) -- 按照时间创建分区
STORED AS -- 指定存储方式,读数据采用LzoTextInputFormat;
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/cloudmall/ods/ods_log' -- 指定数据在hdfs上的存储位置
;
注意:ODS层建表都要指定是lzo格式,即
STORED AS
INPUTFORMAT ‘com.hadoop.mapred.DeprecatedLzoTextInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’
② 导入数据
数据此时是存储在了HDFS中,所以是需要从HDFS导,即load data inpath …
load data inpath '/origin_data/cloudmall/log/topic_log/2020-12-08' overwrite into table ods_log partition(dt='2020-12-08');
③ 写脚本导日志数据,执行hive -e sql命令
如果是执行sql脚本,则是进入到hive客户端执行hive -f dwd_tables.sql
hive=/opt/module/hive/bin/hive为绝对路径,方便在任何目录下面执行该脚本
+%F 表示完整日期,相当于是+%Y-%m-%d
-Dmapreduce.job.queuename=hive这是对Yarn设定了多队列,即新增hive队列,此处是指定提交导hive这个队列上面执行
hdfs_to_ods_log.sh
#!/bin/bash
# 定义变量方便修改
APP=cloudmall
hive=/opt/module/hive/bin/hive
hadoop=/opt/module/hadoop-3.1.3/bin/hadoop
# 如果是输入的日期则取输入日期;如果没有输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
echo ================== 日志日期为 $do_date ==================
sql="
load data inpath '/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log partition(dt='$do_date');
"
$hive -e "$sql"
$hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=hive /warehouse/$APP/ods/ods_log/dt=$do_date
(2)业务数据:
分析表同步策略
① 全量同步策略
② 增量同步策略
③ 新增及变化策略
④ 特殊策略
在生产环境,个别小公司,为了简单处理,所有表全量导入。
中大型公司,由于数据量比较大,还是严格按照同步策略导入数据。
增量表和全量表
增量表:新增数据,增量数据是上次导出之后的新数据。
① 记录每次增加的量,而不是总量;
② 增量表,只报变化量,无变化不用报
③ 每天一个分区
全量表:每天的所有的最新状态的数据。
① 全量表,有无变化,都要报
② 每次上报的数据都是所有的数据(变化的 + 没有变化的)
③ 只有一个分区


① 订单表(增量及更新)ods_order_info
订单号、订单金额、订单状态、用户ID、支付流水号、创建时间、操作时间、省份ID、优惠金额、原价金额、运费
② 订单详情表(增量)ods_order_detail
订单编号、订单号、用户ID、商品ID、商品名称、商品价格、商品数量、创建时间、来源类型、来源编号
③ 订单状态表(增量)ods_order_status_log
编号、订单ID、订单状态、修改时间
……
写脚本导业务数据,数据仍是存储在hdfs里面
hdfs_to_ods_db.sh
#!/bin/bash
APP=cloudmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
sql1="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_order/$do_date' OVERWRITE into table ${APP}.ods_activity_order partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date');
"
# 地区和省份,只在第一天采集,之后都不采集,只需要在第一天导入,可以在第一天手动导
sql2="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;
"
case $1 in
"first"){
$hive -e "$sql1$sql2"
};;
"all"){
$hive -e "$sql1"
};;
esac
执行:第一次执行,发参数first,先导入一次地区和省份表,以后基本就不需要再导
进入到脚本存放目录:./hdfs_to_ods_db.sh first 2020-06-14
之后每天导入数据则只需发送参数all
./hdfs_to_ods_db.sh all 2020-06-15
2、DWD层
(1)日志数据
使用Hive自带的函数get_json_object按前端埋点的关键字将一整张日志数据表ods_log分别解析成为5张日志数据的表,分别是
启动 dwd_start_log
页面 dwd_page_log
曝光 dwd_display_log
动作 dwd_action_log
错误 dwd_error_log
例如,启动日志表dwd_start_log
① 创建外部表EXTERNAL TABLE
hive (cloudmall)>
drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`entry` string COMMENT ' icon手机图标 notice 通知 install 安装后启动',
`loading_time` bigint COMMENT '启动加载时间',
`open_ad_id` string COMMENT '广告页ID ',
`open_ad_ms` bigint COMMENT '广告总共播放时间',
`open_ad_skip_ms` bigint COMMENT '用户跳过广告时点',
`ts` bigint COMMENT '时间'
) COMMENT '启动日志表'
PARTITIONED BY (dt string) -- 按照时间创建分区
stored as parquet -- 采用parquet列式存储
LOCATION '/warehouse/cloudmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
TBLPROPERTIES('parquet.compression'='lzo') -- 采用LZO压缩
;
② 导入数据
注意:Hive中hive.input.format默认值的输入格式为org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,它会将lzo的索引文件也当作数据读入,还会导致lzo文件走MR任务无法切片,所以需要插入数据前先执行
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
hive (cloudmall)>
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_start_log partition(dt='2020-12-08')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-12-08'
and get_json_object(line,'$.start') is not null;
③ DWD层导入日志数据的脚本
ods_to_dwd_log.sh
#!/bin/bash
#接受外部传入的日期
#-n是一个逻辑运算符,用于判断后面的字符串长度是否为0,为0返回false,否则返回true
if [ -n "$1" ]
then
do_date=$1
else
# 默认使用当前日期的前一天
do_date=$(date -d 'yesterday' '+%F')
fi
#echo $do_date
#声明hql
hql="
use cloudmall;
-- 启动日志数据(start)
insert overwrite table dwd_start_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ods_log
where dt='$do_date' and get_json_object(line,'$.start') IS NOT NULL;
-- 页面日志数据(page)
insert overwrite table dwd_page_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts')
from ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;
-- 导入动作日志数据
insert overwrite table dwd_action_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(actionJsonObject,'$.action_id'),
get_json_object(actionJsonObject,'$.item'),
get_json_object(actionJsonObject,'$.item_type'),
get_json_object(actionJsonObject,'$.ts')
from ods_log
lateral view explode_array(get_json_object(line,'$.actions')) tmp as actionJsonObject
where dt='$do_date'
and get_json_object(line,'$.actions') is not null;
-- 导入曝光日志数据
insert overwrite table dwd_display_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
-- 获取数据字段需要跟建表时select中的顺序一致,所以这里的ts在displays之前。
get_json_object(line,'$.ts'),
get_json_object(displayJsonObject,'$.displayType'),
get_json_object(displayJsonObject,'$.item'),
get_json_object(displayJsonObject,'$.item_type'),
get_json_object(displayJsonObject,'$.order')
from ods_log
lateral view explode_array(get_json_object(line,'$.displays')) tmp as displayJsonObject
where dt='$do_date'
and get_json_object(line,'$.displays') is not null;
--导入错误日志数据
insert overwrite table dwd_error_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ods_log
where dt='$do_date'
and get_json_object(line,'$.err') is not null;
"
hive -e "$hql"
--esc退出插入模式,然后:%s/2020-12-08/$do_date/g批量地把2020-12-08替换成$do_date
(2)离线业务数据
事实表8张 + 维度表6张
① 事实表 8张
事务型:4张,不能更改,订单明细、支付、评价、退单
周期型:2张,只关心结果,不关注过程,收藏、加购,如收藏又取消,加购又取消可以反复
累积型:2张,新的覆盖旧的,优惠券领用表、订单事实表
② 维度表 6张
用户 时间 地区 商品 活动 优惠券
用户维度表采用拉链表,有新增,有修改,但不频繁
时间、地区 特殊表
其他都是全量表
新增及变化:优惠券领用表、用户、订单
只新增:订单状态、支付流水、退单、订单详情表、活动与订单关联表、商品评论表
维度退化
① dwd_dim_sku_info商品维度表(全量表) -> sku表,spu表,商品品牌表,一级品类,二级品类,三级品类
② dwd_dim_date_indo 地区维度表(特殊)->
地区表,省份表
③ dwd_dim_activity_info活动维度表(全量) -> 活动规则表,活动表
例如商品维度表表建表:
hive (gmall)>
DROP TABLE IF EXISTS `dwd_dim_sku_info`;
CREATE EXTERNAL TABLE `dwd_dim_sku_info` (
--ods_sku_info
`id` string COMMENT '商品id',
`spu_id` string COMMENT 'spuid',
`price` decimal(16,2) COMMENT '商品价格',
`sku_name` string COMMENT '商品名称',
`sku_desc` string COMMENT '商品描述',
`weight` decimal(16,2) COMMENT '重量',
`tm_id` string COMMENT '品牌id',
--ods_base_trademark
`tm_name` string COMMENT '品牌名称',
--ods_sku_info
--ods_base_category1, ods_base_category2, ods_base_category3
`category3_id` string COMMENT '三级分类id',
`category2_id` string COMMENT '二级分类id',
`category1_id` string COMMENT '一级分类id',
`category3_name` string COMMENT '三级分类名称',
`category2_name` string COMMENT '二级分类名称',
`category1_name` string COMMENT '一级分类名称',
--ods_spu_info
`spu_name` string COMMENT 'spu名称',
--ods_sku_info
`create_time` string COMMENT '创建时间'
) COMMENT '商品维度表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/cloudmall/dwd/dwd_dim_sku_info/'
tblproperties ("parquet.compression"="lzo");
导入数据到表
hive (gmall)>
-- 设置以下参数后,则读数据时不会把创建的lzo索引计算进来。
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
-- ods_sku_info商品维度表
-- 静态分区:在插入数据时,不仅指定分区字段名称,还指定分区字段的值,select后面不需加上dt
INSERT overwrite table dwd_dim_sku_info partition(dt='2020-12-08')
-- 动态分区:在插入数据时,只指定分区字段名称,经查询的最后一个字段的值,作为分区字段的值,select最后加dt
--set hive.exec.dynamic.partition.mode=nonstrict;
--INSERT overwrite table dwd_dim_sku_info partition(dt)
SELECT t1.id, spu_id, price, sku_name, sku_desc, weight, t1.tm_id, tm_name, t1.category3_id, t5.category2_id, t4.category1_id, category3_name, category2_name, category1_name, spu_name, create_time
from
(SELECT
*
from ods_sku_info
where dt='2020-12-08') t1
left join
(SELECT
tm_id,tm_name
from ods_base_trademark
where dt='2020-12-08') t2
on t1.tm_id = t2.tm_id
left join
(SELECT
id,spu_name
from ods_spu_info
where dt='2020-12-08') t3
on t1.spu_id = t3.id
left join
(SELECT
id category3_id,name category3_name,category2_id
from ods_base_category3
where dt='2020-12-08') t6
on t1.category3_id = t6.category3_id
left join
(SELECT
id category2_id,name category2_name,category1_id
from ods_base_category2
where dt='2020-12-08') t5
on t6.category2_id = t5.category2_id
left join
(SELECT
id category1_id,name category1_name
from ods_base_category1
where dt='2020-12-08') t4
on t5.category1_id = t4.category1_id;
ods层导入数据导dwd层脚本
ods_to_dwd_db.sh
#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
sql1="
set mapreduce.job.queuename=hive;
set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_dim_sku_info partition(dt='$do_date')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
sku.create_time
from
(
select * from ${APP}.ods_sku_info where dt='$do_date'
)sku
join
(
select * from ${APP}.ods_base_trademark where dt='$do_date'
)ob on sku.tm_id=ob.tm_id
join
(
select * from ${APP}.ods_spu_info where dt='$do_date'
)spu on spu.id = sku.spu_id
join
(
select * from ${APP}.ods_base_category3 where dt='$do_date'
)c3 on sku.category3_id=c3.id
join
(
select * from ${APP}.ods_base_category2 where dt='$do_date'
)c2 on c3.category2_id=c2.id
join
(
select * from ${APP}.ods_base_category1 where dt='$do_date'
)c1 on c2.category1_id=c1.id;
insert overwrite table ${APP}.dwd_dim_coupon_info partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_dim_activity_info partition(dt='$do_date')
select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_order_detail partition(dt='$do_date')
select
id,
order_id,
user_id,
sku_id,
sku_num,
order_price,
sku_num,
create_time,
province_id,
source_type,
source_id,
original_amount_d,
if(rn=1,final_total_amount-(sum_div_final_amount-final_amount_d),final_amount_d),
if(rn=1,feight_fee-(sum_div_feight_fee-feight_fee_d),feight_fee_d),
if(rn=1,benefit_reduce_amount-(sum_div_benefit_reduce_amount-benefit_reduce_amount_d),benefit_reduce_amount_d)
from
(
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.source_type,
od.source_id,
round(od.order_price*od.sku_num,2) original_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2) final_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2) feight_fee_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2) benefit_reduce_amount_d,
row_number() over(partition by od.order_id order by od.id desc) rn,
oi.final_total_amount,
oi.feight_fee,
oi.benefit_reduce_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2)) over(partition by od.order_id) sum_div_final_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2)) over(partition by od.order_id) sum_div_feight_fee,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2)) over(partition by od.order_id) sum_div_benefit_reduce_amount
from
(
select * from ${APP}.ods_order_detail where dt='$do_date'
) od
join
(
select * from ${APP}.ods_order_info where dt='$do_date'
) oi
on od.order_id=oi.id
)t1;
insert overwrite table ${APP}.dwd_fact_payment_info partition(dt='$do_date')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
pi.payment_time,
oi.province_id
from
(
select * from ${APP}.ods_payment_info where dt='$do_date'
)pi
join
(
select id, province_id from ${APP}.ods_order_info where dt='$do_date'
)oi
on pi.order_id = oi.id;
insert overwrite table ${APP}.dwd_fact_order_refund_info partition(dt='$do_date')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ${APP}.ods_order_refund_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_comment_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ${APP}.ods_comment_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_cart_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from ${APP}.ods_cart_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_favor_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ${APP}.ods_favor_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_coupon_use partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.coupon_id is null,old.coupon_id,new.coupon_id),
if(new.user_id is null,old.user_id,new.user_id),
if(new.order_id is null,old.order_id,new.order_id),
if(new.coupon_status is null,old.coupon_status,new.coupon_status),
if(new.get_time is null,old.get_time,new.get_time),
if(new.using_time is null,old.using_time,new.using_time),
if(new.used_time is null,old.used_time,new.used_time),
date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.dwd_fact_coupon_use
where dt in
(
select
date_format(get_time,'yyyy-MM-dd')
from ${APP}.ods_coupon_use
where dt='$do_date'
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.ods_coupon_use
where dt='$do_date'
)new
on old.id=new.id;
insert overwrite table ${APP}.dwd_fact_order_info partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态
if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from ${APP}.dwd_fact_order_info
where dt
in
(
select
date_format(create_time,'yyyy-MM-dd')
from ${APP}.ods_order_info
where dt='$do_date'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
from ${APP}.ods_order_status_log
where dt='$do_date'
group by order_id
)log
join
(
select * from ${APP}.ods_order_info where dt='$do_date'
)info
on log.order_id=info.id
left join
(
select * from ${APP}.ods_activity_order where dt='$do_date'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
"
sql2="
insert overwrite table ${APP}.dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br
on bp.region_id=br.id;
"
sql3="
insert overwrite table ${APP}.dwd_dim_user_info_his_tmp
select * from
(
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'$do_date' start_date,
'9999-99-99' end_date
from ${APP}.ods_user_info where dt='$do_date'
union all
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date
from ${APP}.dwd_dim_user_info_his uh left join
(
select
*
from ${APP}.ods_user_info
where dt='$do_date'
) ui on uh.id=ui.id
)his
order by his.id, start_date;
insert overwrite table ${APP}.dwd_dim_user_info_his
select * from ${APP}.dwd_dim_user_info_his_tmp;
"
case $1 in
"first"){
$hive -e "$sql1$sql2"
};;
"all"){
$hive -e "$sql1$sql3"
};;
esac