离线数仓1

一、关系建模与维度建模

首先,理清数据处理方式。

当今的数据处理大致可以分成两大类:联机事务处理OLTP(on-line transaction processing)、联机分析处理OLAP(On-Line Analytical Processing)。
① OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易。
② OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。

关系模型主要应用于OLTP系统中,比较的松散、零碎,物理表数量多,而数据冗余程度低,所以,在跨表分析统计查询过程中,会造成多表关联join,这会大大地降低执行效率

维度模型主要应用于OLAP系统中,通常以某一个事实表为中心进行表的组织,主要面向业务,特征是可能存在数据的冗余,但是能方便地得到数据。

二、维度表和事实表

① 维度表:每一张维表对应现实世界中的一个对象或者概念。
例如:用户、商品、日期、地区等。
特点:属性多,即列比较多
如时间维度表可包含哪天、哪月、哪年、哪个季度、是否节假日等等。

② 事实表中的每行数据代表一个业务事件(下单、支付、退款、评价等)。“事实”这个术语表示的是业务事件的度量值(可统计次数、个数、金额等),例如,订单事件中的下单金额。

每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键、通常具有两个和两个以上的外键、外键之间表示维表之间多对多的关系。

特点:列数少,经常发生变化,每天会新增加很多

三、离线数仓

ods - dwd - dws - ads
① ODS层
保持数据原貌,备份
text数据格式,lzo文件压缩,减少磁盘存储量
按每天的日期进行分区,避免后续查询全盘扫描

② DWD层
数据清洗,进行了维度建模,指定存储为parquet数据格式,方便频繁查询。

采用星型模型的维度建模,一般步骤

选择业务过程→声明粒度→确认维度→确认事实
第一步,选择感兴趣的业务,因为业务线较多,如下单业务,支付业务,退款业务,物流业务,一条业务线对应一张事实表。

第二步,声明粒度,即确定保存数据的细化程度,也就是精确定义事实表中的一行数据表示什么,如
订单详情表中,每行数据对应一个订单中的一个商品项,
订单表中,每行数据对应一个订单。

第三步,确定维度
维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。

第四步,确定事实
此处的“事实”一词,指的是业务中的度量值,例如订单金额、下单次数等。

③ DWS层
即统计各个主题对象的当天行为。

④ ADS层
存储分析指标结果。

四、导入离线数据

1、ODS层

日志数据:1张表
业务数据:23张表

(1)离线日志数据:
① 创建外部表EXTERNAL TABLE
只创建一张表ods_log,按日期分区,指定输入和输出的lzo格式,每天数据都是表中的一行Json数据。

hive (cloudmall)> 
drop table if exists ods_log;
CREATE EXTERNAL TABLE ods_log (`line` string)
PARTITIONED BY (`dt` string) -- 按照时间创建分区
STORED AS -- 指定存储方式,读数据采用LzoTextInputFormat;
  INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/cloudmall/ods/ods_log'  -- 指定数据在hdfs上的存储位置
;

注意:ODS层建表都要指定是lzo格式,即
STORED AS
INPUTFORMAT ‘com.hadoop.mapred.DeprecatedLzoTextInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’
② 导入数据
数据此时是存储在了HDFS中,所以是需要从HDFS导,即load data inpath …

load data inpath '/origin_data/cloudmall/log/topic_log/2020-12-08' overwrite into table ods_log partition(dt='2020-12-08');

③ 写脚本导日志数据,执行hive -e sql命令
如果是执行sql脚本,则是进入到hive客户端执行hive -f dwd_tables.sql
hive=/opt/module/hive/bin/hive为绝对路径,方便在任何目录下面执行该脚本
+%F 表示完整日期,相当于是+%Y-%m-%d
-Dmapreduce.job.queuename=hive这是对Yarn设定了多队列,即新增hive队列,此处是指定提交导hive这个队列上面执行

hdfs_to_ods_log.sh

#!/bin/bash

# 定义变量方便修改
APP=cloudmall
hive=/opt/module/hive/bin/hive
hadoop=/opt/module/hadoop-3.1.3/bin/hadoop

# 如果是输入的日期则取输入日期;如果没有输入日期取当前时间的前一天
if [ -n "$1" ] ;then
   do_date=$1
else 
   do_date=`date -d "-1 day" +%F`
fi 

echo ================== 日志日期为 $do_date ==================
sql="
load data inpath '/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log partition(dt='$do_date');
"

$hive -e "$sql"

$hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=hive /warehouse/$APP/ods/ods_log/dt=$do_date

(2)业务数据:
分析表同步策略
① 全量同步策略
在这里插入图片描述
② 增量同步策略
在这里插入图片描述
③ 新增及变化策略
在这里插入图片描述
④ 特殊策略
在这里插入图片描述

在生产环境,个别小公司,为了简单处理,所有表全量导入。
中大型公司,由于数据量比较大,还是严格按照同步策略导入数据。

增量表和全量表
增量表:新增数据,增量数据是上次导出之后的新数据。
① 记录每次增加的量,而不是总量;
② 增量表,只报变化量,无变化不用报
③ 每天一个分区

全量表:每天的所有的最新状态的数据。
① 全量表,有无变化,都要报
② 每次上报的数据都是所有的数据(变化的 + 没有变化的)
③ 只有一个分区

在这里插入图片描述
在这里插入图片描述
① 订单表(增量及更新)ods_order_info
订单号、订单金额、订单状态、用户ID、支付流水号、创建时间、操作时间、省份ID、优惠金额、原价金额、运费
② 订单详情表(增量)ods_order_detail
订单编号、订单号、用户ID、商品ID、商品名称、商品价格、商品数量、创建时间、来源类型、来源编号
③ 订单状态表(增量)ods_order_status_log
编号、订单ID、订单状态、修改时间
……

写脚本导业务数据,数据仍是存储在hdfs里面
hdfs_to_ods_db.sh

#!/bin/bash

APP=cloudmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
    do_date=$2
else 
    do_date=`date -d "-1 day" +%F`
fi

sql1=" 
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');

load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/activity_order/$do_date' OVERWRITE into table ${APP}.ods_activity_order partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date'); 

load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date'); 
"
# 地区和省份,只在第一天采集,之后都不采集,只需要在第一天导入,可以在第一天手动导
sql2=" 
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;

load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;
"
case $1 in
"first"){
    $hive -e "$sql1$sql2"
};;
"all"){
    $hive -e "$sql1"
};;
esac

执行:第一次执行,发参数first,先导入一次地区和省份表,以后基本就不需要再导
进入到脚本存放目录:./hdfs_to_ods_db.sh first 2020-06-14
之后每天导入数据则只需发送参数all
./hdfs_to_ods_db.sh all 2020-06-15

2、DWD层

(1)日志数据
使用Hive自带的函数get_json_object按前端埋点的关键字将一整张日志数据表ods_log分别解析成为5张日志数据的表,分别是
启动 dwd_start_log
页面 dwd_page_log
曝光 dwd_display_log
动作 dwd_action_log
错误 dwd_error_log

例如,启动日志表dwd_start_log
① 创建外部表EXTERNAL TABLE

hive (cloudmall)>
drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
    `area_code` string COMMENT '地区编码',
    `brand` string COMMENT '手机品牌', 
    `channel` string COMMENT '渠道', 
    `model` string COMMENT '手机型号', 
    `mid_id` string COMMENT '设备id', 
    `os` string COMMENT '操作系统', 
    `user_id` string COMMENT '会员id', 
    `version_code` string COMMENT 'app版本号', 
    `entry` string COMMENT ' icon手机图标  notice 通知   install 安装后启动',
    `loading_time` bigint COMMENT '启动加载时间',
    `open_ad_id` string COMMENT '广告页ID ',
    `open_ad_ms` bigint COMMENT '广告总共播放时间', 
    `open_ad_skip_ms` bigint COMMENT '用户跳过广告时点', 
    `ts` bigint COMMENT '时间'
) COMMENT '启动日志表'
PARTITIONED BY (dt string) -- 按照时间创建分区
stored as parquet -- 采用parquet列式存储
LOCATION '/warehouse/cloudmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
TBLPROPERTIES('parquet.compression'='lzo') -- 采用LZO压缩
;

② 导入数据
注意:Hive中hive.input.format默认值的输入格式为org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,它会将lzo的索引文件也当作数据读入,还会导致lzo文件走MR任务无法切片,所以需要插入数据前先执行
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

hive (cloudmall)> 
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_start_log partition(dt='2020-12-08')
select 
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.start.entry'),
    get_json_object(line,'$.start.loading_time'),
    get_json_object(line,'$.start.open_ad_id'),
    get_json_object(line,'$.start.open_ad_ms'),
    get_json_object(line,'$.start.open_ad_skip_ms'),
    get_json_object(line,'$.ts')
from ods_log
where dt='2020-12-08'
and get_json_object(line,'$.start') is not null;

③ DWD层导入日志数据的脚本
ods_to_dwd_log.sh

#!/bin/bash
#接受外部传入的日期
#-n是一个逻辑运算符,用于判断后面的字符串长度是否为0,为0返回false,否则返回true
if [ -n "$1" ]
then 
	do_date=$1
else
#	默认使用当前日期的前一天
	do_date=$(date -d 'yesterday' '+%F')
fi

#echo $do_date

#声明hql
hql="
use cloudmall;
-- 启动日志数据(start)
insert overwrite table dwd_start_log partition(dt='$do_date')
select
	get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.start.entry'),
    get_json_object(line,'$.start.loading_time'),
    get_json_object(line,'$.start.open_ad_id'),
    get_json_object(line,'$.start.open_ad_ms'),
    get_json_object(line,'$.start.open_ad_skip_ms'),
    get_json_object(line,'$.ts')
from ods_log
where dt='$do_date' and get_json_object(line,'$.start') IS  NOT  NULL;

-- 页面日志数据(page)
insert overwrite table dwd_page_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.during_time'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.sourceType'),
    get_json_object(line,'$.ts')
from ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;

-- 导入动作日志数据
insert overwrite table dwd_action_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.during_time'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.sourceType'),
    get_json_object(actionJsonObject,'$.action_id'),
    get_json_object(actionJsonObject,'$.item'),
    get_json_object(actionJsonObject,'$.item_type'),
    get_json_object(actionJsonObject,'$.ts')
from ods_log
lateral view explode_array(get_json_object(line,'$.actions')) tmp as actionJsonObject 
where dt='$do_date'
and get_json_object(line,'$.actions') is not null;

-- 导入曝光日志数据
insert overwrite table dwd_display_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.during_time'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.sourceType'),
    -- 获取数据字段需要跟建表时select中的顺序一致,所以这里的ts在displays之前。
    get_json_object(line,'$.ts'),
    get_json_object(displayJsonObject,'$.displayType'),
    get_json_object(displayJsonObject,'$.item'),
    get_json_object(displayJsonObject,'$.item_type'),
    get_json_object(displayJsonObject,'$.order')
from ods_log
lateral view explode_array(get_json_object(line,'$.displays')) tmp as displayJsonObject
where dt='$do_date'
and get_json_object(line,'$.displays') is not null;

--导入错误日志数据
insert overwrite table dwd_error_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.sourceType'),
    get_json_object(line,'$.start.entry'),
    get_json_object(line,'$.start.loading_time'),
    get_json_object(line,'$.start.open_ad_id'),
    get_json_object(line,'$.start.open_ad_ms'),
    get_json_object(line,'$.start.open_ad_skip_ms'),
    get_json_object(line,'$.actions'),
    get_json_object(line,'$.displays'),
    get_json_object(line,'$.ts'),
    get_json_object(line,'$.err.error_code'),
    get_json_object(line,'$.err.msg')
from ods_log
where dt='$do_date'
and get_json_object(line,'$.err') is not null;
"

hive -e "$hql"
--esc退出插入模式,然后:%s/2020-12-08/$do_date/g批量地把2020-12-08替换成$do_date

(2)离线业务数据
事实表8张 + 维度表6张
在这里插入图片描述
① 事实表 8张
‌事务型:4张,不能更改,订单明细、支付、评价、退单
‌周期型:2张,只关心结果,不关注过程,收藏、加购,如收藏又取消,加购又取消可以反复
‌累积型:2张,新的覆盖旧的,优惠券领用表、订单事实表

② 维度表 6张
用户 时间 地区 商品 活动 优惠券
用户维度表采用拉链表,有新增,有修改,但不频繁
时间、地区 特殊表
其他都是全量表

新增及变化:优惠券领用表、用户、订单
只新增:订单状态、支付流水、退单、订单详情表、活动与订单关联表、商品评论表

维度退化
① dwd_dim_sku_info商品维度表(全量表) -> sku表,spu表,商品品牌表,一级品类,二级品类,三级品类
② dwd_dim_date_indo 地区维度表(特殊)->
地区表,省份表
③ dwd_dim_activity_info活动维度表(全量) -> 活动规则表,活动表

例如商品维度表表建表:

hive (gmall)> 
DROP TABLE IF EXISTS `dwd_dim_sku_info`;
CREATE EXTERNAL TABLE `dwd_dim_sku_info` (
	--ods_sku_info
    `id` string COMMENT '商品id',
    `spu_id` string COMMENT 'spuid',
    `price` decimal(16,2) COMMENT '商品价格',
    `sku_name` string COMMENT '商品名称',
    `sku_desc` string COMMENT '商品描述',
    `weight` decimal(16,2) COMMENT '重量',
`tm_id` string COMMENT '品牌id',
--ods_base_trademark
`tm_name` string COMMENT '品牌名称',
--ods_sku_info
--ods_base_category1, ods_base_category2, ods_base_category3
    `category3_id` string COMMENT '三级分类id',
    `category2_id` string COMMENT '二级分类id',
    `category1_id` string COMMENT '一级分类id',
    `category3_name` string COMMENT '三级分类名称',
    `category2_name` string COMMENT '二级分类名称',
`category1_name` string COMMENT '一级分类名称',
--ods_spu_info
`spu_name` string COMMENT 'spu名称',
--ods_sku_info
    `create_time` string COMMENT '创建时间'
) COMMENT '商品维度表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/cloudmall/dwd/dwd_dim_sku_info/'
tblproperties ("parquet.compression"="lzo");

导入数据到表

hive (gmall)> 
-- 设置以下参数后,则读数据时不会把创建的lzo索引计算进来。
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
-- ods_sku_info商品维度表
-- 静态分区:在插入数据时,不仅指定分区字段名称,还指定分区字段的值,select后面不需加上dt
INSERT overwrite table dwd_dim_sku_info partition(dt='2020-12-08')
-- 动态分区:在插入数据时,只指定分区字段名称,经查询的最后一个字段的值,作为分区字段的值,select最后加dt
--set hive.exec.dynamic.partition.mode=nonstrict;
--INSERT overwrite table dwd_dim_sku_info partition(dt)
SELECT t1.id, spu_id, price, sku_name, sku_desc, weight, t1.tm_id, tm_name, t1.category3_id, t5.category2_id, t4.category1_id, category3_name, category2_name, category1_name, spu_name, create_time
from
(SELECT
*
from ods_sku_info
where dt='2020-12-08') t1
left join 
(SELECT
tm_id,tm_name
from ods_base_trademark
where dt='2020-12-08') t2
on t1.tm_id = t2.tm_id
left join
(SELECT
id,spu_name
from ods_spu_info
where dt='2020-12-08') t3
on t1.spu_id = t3.id
left join
(SELECT
id category3_id,name category3_name,category2_id 
from ods_base_category3
where dt='2020-12-08') t6
on t1.category3_id = t6.category3_id
left join
(SELECT
id category2_id,name category2_name,category1_id 
from ods_base_category2
where dt='2020-12-08') t5
on t6.category2_id = t5.category2_id
left join
(SELECT
id category1_id,name category1_name
from ods_base_category1
where dt='2020-12-08') t4
on t5.category1_id = t4.category1_id;

ods层导入数据导dwd层脚本
ods_to_dwd_db.sh

#!/bin/bash

APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
    do_date=$2
else 
    do_date=`date -d "-1 day" +%F`
fi

sql1="
set mapreduce.job.queuename=hive;
set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

insert overwrite table ${APP}.dwd_dim_sku_info partition(dt='$do_date')
select  
    sku.id,
    sku.spu_id,
    sku.price,
    sku.sku_name,
    sku.sku_desc,
    sku.weight,
    sku.tm_id,
    ob.tm_name,
    sku.category3_id,
    c2.id category2_id,
    c1.id category1_id,
    c3.name category3_name,
    c2.name category2_name,
    c1.name category1_name,
    spu.spu_name,
    sku.create_time
from
(
    select * from ${APP}.ods_sku_info where dt='$do_date'
)sku
join
(
    select * from ${APP}.ods_base_trademark where dt='$do_date'
)ob on sku.tm_id=ob.tm_id
join
(
    select * from ${APP}.ods_spu_info where dt='$do_date'
)spu on spu.id = sku.spu_id
join 
(
    select * from ${APP}.ods_base_category3 where dt='$do_date'
)c3 on sku.category3_id=c3.id
join 
(
    select * from ${APP}.ods_base_category2 where dt='$do_date'
)c2 on c3.category2_id=c2.id 
join 
(
    select * from ${APP}.ods_base_category1 where dt='$do_date'
)c1 on c2.category1_id=c1.id;


insert overwrite table ${APP}.dwd_dim_coupon_info partition(dt='$do_date')
select
    id,
    coupon_name,
    coupon_type,
    condition_amount,
    condition_num,
    activity_id,
    benefit_amount,
    benefit_discount,
    create_time,
    range_type,
    spu_id,
    tm_id,
    category3_id,
    limit_num,
    operate_time,
    expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_dim_activity_info partition(dt='$do_date')
select
    id,
    activity_name,
    activity_type,
    start_time,
    end_time,
    create_time
from ${APP}.ods_activity_info 
where dt='$do_date';

insert overwrite table ${APP}.dwd_fact_order_detail partition(dt='$do_date')
select
    id,
    order_id,
    user_id,
    sku_id,
    sku_num,
    order_price,
    sku_num,
    create_time,
    province_id,
    source_type,
    source_id,
    original_amount_d,
    if(rn=1,final_total_amount-(sum_div_final_amount-final_amount_d),final_amount_d),
    if(rn=1,feight_fee-(sum_div_feight_fee-feight_fee_d),feight_fee_d),
    if(rn=1,benefit_reduce_amount-(sum_div_benefit_reduce_amount-benefit_reduce_amount_d),benefit_reduce_amount_d)
from
(
    select
        od.id,
        od.order_id,
        od.user_id,
        od.sku_id,
        od.sku_name,
        od.order_price,
        od.sku_num,
        od.create_time,
        oi.province_id,
        od.source_type,
        od.source_id,
        round(od.order_price*od.sku_num,2) original_amount_d,
        round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2) final_amount_d,
        round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2) feight_fee_d,
        round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2) benefit_reduce_amount_d,
        row_number() over(partition by od.order_id order by od.id desc) rn,
        oi.final_total_amount,
        oi.feight_fee,
        oi.benefit_reduce_amount,
        sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2)) over(partition by od.order_id) sum_div_final_amount,
        sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2)) over(partition by od.order_id) sum_div_feight_fee,
        sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2)) over(partition by od.order_id) sum_div_benefit_reduce_amount
    from 
    (
        select * from ${APP}.ods_order_detail where dt='$do_date'
    ) od
    join 
    (
        select * from ${APP}.ods_order_info where dt='$do_date'
    ) oi
    on od.order_id=oi.id
)t1;

insert overwrite table ${APP}.dwd_fact_payment_info partition(dt='$do_date')
select
    pi.id,
    pi.out_trade_no,
    pi.order_id,
    pi.user_id,
    pi.alipay_trade_no,
    pi.total_amount,
    pi.subject,
    pi.payment_type,
    pi.payment_time,          
    oi.province_id
from
(
    select * from ${APP}.ods_payment_info where dt='$do_date'
)pi
join
(
    select id, province_id from ${APP}.ods_order_info where dt='$do_date'
)oi
on pi.order_id = oi.id;


insert overwrite table ${APP}.dwd_fact_order_refund_info partition(dt='$do_date')
select
    id,
    user_id,
    order_id,
    sku_id,
    refund_type,
    refund_num,
    refund_amount,
    refund_reason_type,
    create_time
from ${APP}.ods_order_refund_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_fact_comment_info partition(dt='$do_date')
select
    id,
    user_id,
    sku_id,
    spu_id,
    order_id,
    appraise,
    create_time
from ${APP}.ods_comment_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_fact_cart_info partition(dt='$do_date')
select
    id,
    user_id,
    sku_id,
    cart_price,
    sku_num,
    sku_name,
    create_time,
    operate_time,
    is_ordered,
    order_time,
    source_type,
    source_id
from ${APP}.ods_cart_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_fact_favor_info partition(dt='$do_date')
select
    id,
    user_id,
    sku_id,
    spu_id,
    is_cancel,
    create_time,
    cancel_time
from ${APP}.ods_favor_info
where dt='$do_date';

insert overwrite table ${APP}.dwd_fact_coupon_use partition(dt)
select
    if(new.id is null,old.id,new.id),
    if(new.coupon_id is null,old.coupon_id,new.coupon_id),
    if(new.user_id is null,old.user_id,new.user_id),
    if(new.order_id is null,old.order_id,new.order_id),
    if(new.coupon_status is null,old.coupon_status,new.coupon_status),
    if(new.get_time is null,old.get_time,new.get_time),
    if(new.using_time is null,old.using_time,new.using_time),
    if(new.used_time is null,old.used_time,new.used_time),
    date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from
(
    select
        id,
        coupon_id,
        user_id,
        order_id,
        coupon_status,
        get_time,
        using_time,
        used_time
    from ${APP}.dwd_fact_coupon_use
    where dt in
    (
        select
            date_format(get_time,'yyyy-MM-dd')
        from ${APP}.ods_coupon_use
        where dt='$do_date'
    )
)old
full outer join
(
    select
        id,
        coupon_id,
        user_id,
        order_id,
        coupon_status,
        get_time,
        using_time,
        used_time
    from ${APP}.ods_coupon_use
    where dt='$do_date'
)new
on old.id=new.id;


insert overwrite table ${APP}.dwd_fact_order_info partition(dt)
select
    if(new.id is null,old.id,new.id),
    if(new.order_status is null,old.order_status,new.order_status),
    if(new.user_id is null,old.user_id,new.user_id),
    if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
    if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态
    if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
    if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
    if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
    if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
    if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
    if(new.province_id is null,old.province_id,new.province_id),
    if(new.activity_id is null,old.activity_id,new.activity_id),
    if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
    if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
    if(new.feight_fee is null,old.feight_fee,new.feight_fee),
    if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
    date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from
(
    select
        id,
        order_status,
        user_id,
        out_trade_no,
        create_time,
        payment_time,
        cancel_time,
        finish_time,
        refund_time,
        refund_finish_time,
        province_id,
        activity_id,
        original_total_amount,
        benefit_reduce_amount,
        feight_fee,
        final_total_amount
    from ${APP}.dwd_fact_order_info
    where dt
    in
    (
        select
          date_format(create_time,'yyyy-MM-dd')
        from ${APP}.ods_order_info
        where dt='$do_date'
    )
)old
full outer join
(
    select
        info.id,
        info.order_status,
        info.user_id,
        info.out_trade_no,
        info.province_id,
        act.activity_id,
        log.tms,
        info.original_total_amount,
        info.benefit_reduce_amount,
        info.feight_fee,
        info.final_total_amount
    from
    (
        select
            order_id,
            str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
        from ${APP}.ods_order_status_log
        where dt='$do_date'
        group by order_id
    )log
    join
    (
        select * from ${APP}.ods_order_info where dt='$do_date'
    )info
    on log.order_id=info.id
    left join
    (
        select * from ${APP}.ods_activity_order where dt='$do_date'
    )act
    on log.order_id=act.order_id
)new
on old.id=new.id;
"

sql2="
insert overwrite table ${APP}.dwd_dim_base_province
select 
    bp.id,
    bp.name,
    bp.area_code,
    bp.iso_code,
    bp.region_id,
    br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br
on bp.region_id=br.id;
"

sql3="
insert overwrite table ${APP}.dwd_dim_user_info_his_tmp
select * from 
(
    select 
        id,
        name,
        birthday,
        gender,
        email,
        user_level,
        create_time,
        operate_time,
        '$do_date' start_date,
        '9999-99-99' end_date
    from ${APP}.ods_user_info where dt='$do_date'

    union all 
    select 
        uh.id,
        uh.name,
        uh.birthday,
        uh.gender,
        uh.email,
        uh.user_level,
        uh.create_time,
        uh.operate_time,
        uh.start_date,
        if(ui.id is not null  and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date
    from ${APP}.dwd_dim_user_info_his uh left join 
    (
        select
            *
        from ${APP}.ods_user_info
        where dt='$do_date'
    ) ui on uh.id=ui.id
)his 
order by his.id, start_date;

insert overwrite table ${APP}.dwd_dim_user_info_his 
select * from ${APP}.dwd_dim_user_info_his_tmp;
"

case $1 in
"first"){
    $hive -e "$sql1$sql2"
};;
"all"){
    $hive -e "$sql1$sql3"
};;
esac

版权声明:本文为elsechan原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。