这个项目实在数据采集基础使用的,需要提前复习之前学的东西,否则的话就是很难继续学习.详见博客数据项目一 —数据采集项目. 大数据项目 — 数据采集项目_YllasdW的博客-CSDN博客大数据第一个项目笔记整理https://blog.csdn.net/m0_47489229/article/details/127477626 ;
目录
二.配置spark的环境变量$ sudo vim /etc/profile.d/my_env.sh
三.新建spark配置文件,因为我们这里使用的是hive on spark$ vim /opt/module/hive/conf/spark-defaults.conf
五.修改hive-site.xml文件 — 真正的将hive和spark进行关联
二十二.DWD层_get_json_object — 工薪阶层
四十九.DWD_业务_系统函数(concat、concat_ws、collect_set、STR_TO_MAP)
七十二.多表join出现的问题_使用COALESCE函数解决
一. 采集项目架构
我们使用的体系是Hadoop体系,是属于这个生态的.
为什么要使用数据仓库,因为后面使用机器学习的来源是使用数据仓库的,以及后面学习的用户画像.传统的公司收集的数据都是来自于业务数据,但是大数据体系还是要收集来自于用户产生的实时,解决问题需要考虑业务思维.
如何采集用户数据 — 埋点方法(行为会被记录下来),将其使用日志服务器Springboot进行采集,得到logFile,再通过Flume进行采集.Flume采集logFile文件的时候,它的源于源之间是不同的,Flume可以保证数据的原子性和一致性,翻译成人话就是可以保证数据不丢失.
面试的一些问题,架构设计原因(防止压价,要钱):
①Exec Source在生产环境是不会使用这个的,因为这个Source是不能够保证put完全成功的,一旦要是失败会出现丢失数据的情况,为了更强的保证性,可以使用Taildir Source.Flume事务的代码是根据相应的channal决定的,生产环境一般是不使用Exec Source,也就是相应的logFile使用的是Taildir Source.
Exec Source的父类是Event Driven,但是Taildir Source的父类是Puable Source,后者的可靠性是比原来的可靠性高的,因此,这里的用户数据是使用的Taildir Source.
②这里加入Kafka的原因是因为HDFS的吞吐量是不如Kafka高.因此这里放入Kafka的方法是可以直接使用SparkStreaming将相应的数据进行实时指标分析;另一个原因还Kafka是可以一对多,在这个领域是比较出色的,方便多个源使用同一个数据.
③Kafka是个消息队列,是用来存储数据.Flume是主要的采集部分.如果要是没有Flume,也是可以读进Kafka,那为什么还要使用Flume?Flume做的最多的是Source和Sink,用户数据采集的地方,第一个Flume和第二个Flume是可以不用的.那为什么要使用呢?为啥不用现成的的Flume,要自己重复造个轮子呢,是完全没有必要的.
二. 用户采集平台
继续深挖刚才的细节:
①第一个(上游)Flume使用的Taildir Source – Kafka Channal,第二个(下游)Flume使用的是相应的Kafka Source – File Channal – HDFS Sink.第一个Flume也可以使用Taildir Sourc – File Channal – Kafka Sink,但是为什么我们没有使用这个架构,因为中间多了一层,不如是直接将上面的那一层进行省略掉.(复杂度上升,并且会变得复杂,吃资源,尤其是Channal这个部分).
那为啥下游不使用Kafka Source – HDFS Channal,因为拦截器的使用.上游是ETL拦截器,为了过滤不合格的json格式的文件.下游是使用的timeStamp拦截器,这个作用是为了解决相应的零点漂移(人话:防止昨天的数据,出现在今天采集的数据之中).这里的是时间戳timeStamp拦截器可以是放在上游的,但是不太好,如果要是在上游的时候使用了,就会产生相应的带有Header部分,导致在写入Kafka的时候,处理的数据是EVENT格式的文件,并不是JSON格式(恍然大悟).
②上面的问题解决方式,使用Kafka时间戳拦截器,会在执行的过程中产生大量的冗余.
③下游的Flume为什么搞一个File Channal?回答:我们想过很多的方案,这里的拦截器是不能够写在上游的Flume之中.使用Kafka Channal会导致大量的对象的生成,会出现大量的垃圾回收,这里的性能是不能采用这个方式进行相应的采集的,因此没有使用Kafka Channal.使用File Channal的性能还是过的去的,就是这样回答.
三. 业务采集架构
问题这里为什么使用Sqoop?
Sqoop是使用的阿里云啥的,基本是一天一采集,直接放到相应的HDFS之中.为什么要使用Sqoop为什么要用它,因为它虽然不是最先进的,但是使用了那么长的时间,它的性能是完全够用的.现在也在考虑将这个地方进行升级,但是没有充足的测试.
四. 后台商品管理表格
这里的C位表一定是相应的订单表 用户表 商品表. 商品表这个东西分为SPU和SKU,这里的SPU指一个完整的整体,以SKU为单位,以IPHONE14为例子.
5. 电商业务表格
这里的电商数据业务表格的说明,是需要将之前学习过的东西,也就是所谓的sql之中的东西放入到IDEA之中进行查看。对于项目一之中的东西进行复习。
就是实际之中的表并不是那么少的,会很多的。
6.采集项目压缩编码修改
原来的Flume我们是使用的lzo进行的压缩,这里需要改成相应的gzip进行压缩。
1)这里我之前设置的路径是$ /opt/module/flume-1.9.0/jobs/gmall
2)将sqoop之中的东西采用gzip进行压缩
将上图之中的这句话给删除。在mysql_to_hdfs.sh脚本之中进行与上述操作相同的操作。
为什么要用lzop而不用snap?学一下依赖的使用,并且lzop会产生相应的切片。flume采集过程使用lzop没有问题,snap使用flume。sqoop使用lzop文件不会大的离谱。snap不会产生相应的切片。
七.数仓简介
数仓分层如下所示:
ODS层:原始数据层,直接加载原始的数据、日志,数据保持原貌,不作处理。
DWD层:对ODS层的数据进行相应的清洗(去除空值、脏数据等),保存业务事实明细。一行的数据代表一次业务行为,例如一次下单。
DIM层:维度层,保存维度数据,主要是对业务事实的描述信息,例如何人、何时、何地等。我早上在宿舍11点醒的,12点半吃的饭。
DWS层:对于DWD层做一个汇总。一行信息代表一个主题对象一天的汇总行为,例如:一个用户一天的下单次数。
DWT层:对于DWS层的一个汇总。一行的信息代表一个主题对象的累积行为,例如一个用户从注册那一天开始到现在一共下了多少单。
ADS层:为各种统计报表提供的数据。
数据仓库为什么要进行分层?
①复杂的问题简单化
②减少重复数据的开发
③隔离原始数据
基本的概念(先是把相应的概念进行炒作,接下来就是进行赚钱)
数据海 数据仓库 数据集[这里的几个概念就是数据大小的区别,总而言之都是数据]
表名命名规范
- ODS层命名为ods_表名
- DIM层命名为dim_表名
- DWD层命名为dwd_表名
- DWS层命名为dws_表名
- DWT层命名为dwt_表名
- ADS层命名为ads_表名
- 临时表命名为tmp_表名
表字段类型命名规范
- 数量类型为bigint
- 金额类型为decimal(16, 2),表示:16位有效数字,其中小数部分2位(double不够精确的)
- 字符串(名字,描述信息等)类型为string
- 主键外键类型为string
- 时间戳类型为bigint
八.范式
数据建模必须遵循一定的规则,在关系数建模中,这种规则就是范式。优点:可以减少数据的冗余性。缺点:获取数据时,需要通过Join拼接出最后的数据。分类:范式(1NF)、第二范式(2NF)、第三范式(3NF)、巴斯-科德范式(BCNF)、第四范式(4NF)、第五范式(5NF)。
建模的含义就是创建一个数学模型或者理论模型用来模拟现实生活之中的一些数据。
函数依赖:概念麻烦,用人话表示。
上面的分数是需要使用学号和课名表示才能够得知,这叫做完全函数依赖。
由学号和课名可以推导出一个人的姓名,但是只是知道学号也是可以知道姓名,这就是所谓的部分函数依赖。
学号-系名-系主任——传递函数依赖
第一范式
实际上,1NF是所有关系型数据库的最基本的要求,如果一个表设计不是符合第一范式的,那么一定是有问题的。
第二范式
原则:不存在”部分函数依赖”
上面的表格是明显存在部分依赖的,分数是依赖于学号和课名,但是姓名并不是依赖于学号和课名的。因此,将上述的表进行拆开,分成两个表进行分析。
上述的两个表示没有部分依赖的,因此就是将上述的表格二范式化了。将一张大表分成了两个小表。
第三范式
原则:不能存在传递函数依赖
上面的表示学号 -> 系名 -> 系主任,将上述的表格进行相应的拆分。
进而符合第三范式过程。
小小总结:范式越高,数据越是清晰一些,一致性越是好一些 ——- 容易差错。时间代价降低,空间代价会上升。【空间换取时间】
九. 维度建模 和关系建模
为了降低查询时所消耗的时间代价,减少使用join,一般是范式一应用于数据仓库之中。
关系建模和维度建模是两种数据仓库的建模技术。关系建模由Bill Inmon所倡导,维度建模由Ralph Kimball所倡导。
关系建模
关系模型严格遵循第三范式(3NF),数据冗余程度低,数据的一致性容易得到保证。由于数据分布于众多的表中,查询会相对复杂,在大数据的场景下,查询效率相对较低。
数据仓库一般是不使用这种范式建模的,因为这里的东西太碎了,代价太高了。(要写很多的join)
维度建模
上图之中所有的信息是围绕中间的事实进行联系,进而得到相应的维度建模。
维度模型以数据分析作为出发点,不遵循三范式,故数据存在一定的冗余。维度模型面向业务,将业务用事实表和维度表呈现出来。表结构简单,故查询简单,查询效率较高。在大数据之中应用是比较多的。
数据仓库的第一步是进行数据规划,确定数据仓库需要哪些表格。维度建模,用来提高查询效率,发生在DWD和DIM层之中。
维度表和事实表
维度表:一般是对事实的描述信息。每一张维表对应现实世界中的一个对象或者概念。维度一定是一个抽象的东西,例如:用户、商品、日期、地区等。煎饼果子就不是维度表。
维表的特征:①维表的范围很宽(具有多个属性、列比较多)②跟事实表相比,行数相对较小:通常< 10万条 ③内容相对固定:编码表
时间维度表:
日期ID
day of week
day of year
季度
节假日
2022-11-07
2
1
1
元旦
2022-11-08
3
2
1
无
2022-11-09
4
3
1
无
2022-11-10
5
4
1
无
2022-11-11
6
5
1
无
事实表:事实表中的每行数据代表一个业务事件(下单、支付、退款、评价等)。”事实”这个术语表示的是业务事件的度量值(可统计次数、个数、金额等)。也就是将上面的维度变成id,其余的信息变成度量值。例如:2022年11月7日,姚毅伟同学(哈哈哈)在遵义的某团偷偷花了250块钱买了八瓶六味地黄丸。维度表:时间、用户、商品、商家。事实表:250块钱、八瓶。
每一个事实表的行包括:具有可加性的数值型的度量值、与维表相连接的外键,通常具有两个和两个以上的外键。
事实表的特征:①非常的大 ②内容相对的窄:列数较少(主要是外键id和度量值)③经常发生变化,每天会新增加很多。
分类:
1)事务型事实表
以每个事务或事件为单位,事件一旦被提交不能更改,只能增加。
2)周期型快照事实表
周期型快照事实表中不会保留所有数据,只保留固定时间间隔的数据,例如每天或者每月的销售额,或每月的账户余额等。
3)累积型快照事实表
累计快照事实表用于跟踪业务事实的变化。例如,数据仓库中可能需要累积或者存储订单从下订单开始,到订单商品被打包、运输、和签收的各个业务阶段的时间点数据来跟踪订单声明周期的进展情况。当这个业务过程进行时,事实表的记录也要不断更新。
十.维度建模分类
在维度建模的基础上又分为三种模型:星型模型、雪花模型、星座模型。
1.星型模型
2.雪花模型
雪花模型更加靠近于范式三类型,相对而言,星型模型是更加接近于范式一类型的。一般我们是使用星型模型进行数据分析。
3.星座模型
星座表与前面的两个情况的区别是事实表的数量,星座模型是基于多个事实表。
十一.数仓建模全过程(绝对重点)
一.ODS层 — 用户存放的数据
1)HDFS 用户行为数据
2)HDFS 业务数据
针对HDFS上的用户行为数据和业务数据,我们如何规划处理?
(1)保持数据原貌不做任何修改,起到备份数据的作用。
(2)数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右。一般我们这部分数据访问的频率是非常低的)
(3)创建分区表,防止后续的全表扫描
二.DIM层和DWD层
DIM层DWD层需构建维度模型,一般采用星型模型,呈现的状态一般为星座模型。
维度建模一般按照以下四个步骤: 选择业务过程→声明粒度→确认维度→确认事实
(1)选择业务过程:在业务系统中,挑选我们感兴趣的业务线。
(2)声明粒度:数据粒度指数据仓库的数据中保存数据的细化程度或综合程度的级别。声明粒度意味着精确定义事实表中的一行数据表示什么,应该尽可能选择最小粒度,以此来应各种各样的需求。粒度粗,数据量小,统计信息就是不会那么细的。粒度细,数据量大,统计的信息就是会非常的多。
(3)确认维度:维度的主要作用是描述业务是事实,主要表示的是”谁,何处,何时”等信息。确定维度的原则是:后续需求中是否要分析相关维度的指标。例如,需要统计,什么时间下的订单多,哪个地区下的订单多,哪个用户下的订单多。需要确定的维度就包括:时间维度、地区维度、用户维度。
(4)确定事实:此处的”事实”一词,指的是业务中的度量值(次数、个数、件数、金额,可以进行累加),例如订单金额、下单次数等。在DWD层,以业务过程为建模驱动,基于每个具体业务过程的特点,构建最细粒度的明细层事实表。事实表可做适当的宽表化处理。
上面的过程就是比较绕的,过程但是容易理解的。
事实表\维度
时间
用户
地区
商品
优惠券
活动
度量值
订单
√
√
√
运费/优惠金额/原始金额/最终金额
订单详情
√
√
√
√
√
√
件数/优惠金额/原始金额/最终金额
支付
√
√
√
支付金额
加购
√
√
√
件数/金额
收藏
√
√
√
次数
评价
√
√
√
次数
退单
√
√
√
√
件数/金额
退款
√
√
√
√
件数/金额
优惠券领用
√
√
√
次数
由上面的表可以得知,事实表存在,不一定有相应的维度对应。优惠券和活动关联到的事实表是比较少的。
数据仓库的维度建模已经完毕,DWD层是以业务过程为驱动。
DWS层、DWT层和ADS层都是以需求为驱动,和维度建模已经没有关系了。
DWS和DWT都是建宽表,按照主题去建表。主题相当于观察问题的角度。对应着维度表。
三.DWS层与DWT层
对于上面的维度可以进行两个两个组合啥的,进而得到最终的结果。
DWS层和DWT层统称宽表层,这两层的设计思想大致相同,通过以下案例进行阐述。
1)问题引出:两个需求,统计每个省份订单的个数、统计每个省份订单的总金额
2)处理办法:都是将省份表和订单表进行join,group by省份,然后计算。同样数据被计算了两次,实际上类似的场景还会更多。
那怎么设计能避免重复计算呢?
针对上述场景,可以设计一张地区宽表,其主键为地区ID,字段包含为:下单次数、下单金额、支付次数、支付金额等。上述所有指标都统一进行计算,并将结果保存在该宽表中,这样就能有效避免数据的重复计算。
DWS层是每天,DWT是迄今为止。
十二.Hive搭建
一.架构说明
Hive引擎包括:默认MR(每执行一次map reduce就是要落一次盘)、tez(有向图进行计算,理解为弱化版的spark)、spark(可以是map map ,然后再进行reduce)
Hive on Spark:Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。本质是hive,但是执行的引擎是spark,速度是比较快的。(原来是MR)—— 这里我们使用的是
Spark on Hive : Hive只作为存储元数据,Spark负责SQL解析优化,语法是Spark SQL 语法。
二.hive配置
懵了(看的课程没有对应上,重新查找课程)
面试问题:你在开发的时候有没有遇到版本兼容问题?经常会问
后面是如何配置会说
十三.HiveOnSpark搭建
一.上传包
$ cd /opt/software — 上传自己的两个jar包
$ tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
$ mv spark-3.0.0-bin-hadoop3.2/ spark
二.配置spark的环境变量 $ sudo vim /etc/profile.d/my_env.sh
SPARK_HOME
export SPARK_HOME=/opt/module/spark
export PATH=$PATH:$SPARK_HOME/bin
$ source /etc/profile.d/my_env.sh
三.新建spark配置文件,因为我们这里使用的是hive on spark $ vim /opt/module/hive/conf/spark-defaults.conf
spark.master yarn
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:8020/spark-history
spark.executor.memory 1g
spark.driver.memory 1g
创建一个存放日志的东西:$ hadoop fs -mkdir /spark-history
四. 向HDFS上传Spark纯净版jar包
说明1:由于Spark3.0.0非纯净版默认支持的是hive2.3.7版本,直接使用会和安装的Hive3.1.2出现兼容性问题。所以采用Spark纯净版jar包,不包含hadoop和hive相关依赖,避免冲突。
说明2:Hive任务最终由Spark来执行,Spark任务资源分配由Yarn来调度,该任务有可能被分配到集群的任何一个节点。所以需要将Spark的依赖上传到HDFS集群路径,这样集群中任何一个节点都能获取到。
(1)上传并解压spark-3.0.0-bin-without-hadoop.tgz
[atguigu@hadoop102 software]$ tar -zxvf /opt/software/spark-3.0.0-bin-without-hadoop.tgz
(2)上传Spark纯净版jar包到HDFS
[atguigu@hadoop102 software]$ hadoop fs -mkdir /spark-jars
[atguigu@hadoop102 software]$ hadoop fs -put spark-3.0.0-bin-without-hadoop/jars/* /spark-jars
五.修改hive-site.xml文件 — 真正的将hive和spark进行关联
[atguigu@hadoop102 software]$ vim /opt/module/hive/conf/hive-site.xml
<!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)-->
<property>
<name>spark.yarn.jars</name>
<value>hdfs://hadoop102:8020/spark-jars/*</value>
</property>
<!--Hive执行引擎-->
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
<!--Hive和Spark连接超时时间,电脑比较好就不用改-->
<property>
<name>hive.spark.client.connect.timeout</name>
<value>10000ms</value>
</property>
注意:hive.spark.client.connect.timeout的默认值是1000ms,如果执行hive的insert语句时,抛如下异常,可以调大该参数到10000ms
FAILED: SemanticException Failed to get a spark session: org.apache.hadoop.hive.ql.metadata.HiveException: Failed to create Spark client for Spark session d9e0224c-3d14-4bf4-95bc-ee3ec56df48e
六.Hive on Spark 测试
(1)启动hive客户端
[atguigu@hadoop102 hive]$ bin/hive
(2)创建一张测试表
hive (default)> create table student(id int, name string);
(3)通过insert测试效果
hive (default)> insert into table student values(1,’abc’);
若结果如下,则说明配置成功
为什么这个地方还是卡着的?
再次执行一个任务
关闭第一个任务,可以观察到如下的现象.
解决方式:1. 增加Application Master资源比例
容量调度器对每个资源队列中同时运行的Application Master占用的资源进行了限制,该限制通过yarn.scheduler.capacity.maximum-am-resource-percent参数实现,其默认值是0.1,表示每个资源队列上Application Master最多可使用的资源为该队列总资源的10%,目的是防止大部分资源都被Application Master占用,而导致Map/Reduce Task无法执行。
生产环境该参数可使用默认值。但学习环境,集群资源总数很少,如果只分配10%的资源给Application Master,则可能出现,同一时刻只能运行一个Job的情况,因为一个Application Master使用的资源就可能已经达到10%的上限了。故此处可将该值适当调大。
1)修改配置
$ vim /opt/module/hadoop-3.1.3/etc/hadoop/capacity-scheduler.xml
2)对于上面的数值可以进行更改.对于上面配置进行分发
[atguigu@hadoop102 hadoop]$ rsync.sh capacity-scheduler.xml
3)关闭正在运行的任务,重新启动yarn集
可以观察到这样的情况是正常的.MR是能够执行完成的,但是hive是没有关掉,当关掉的时候才能够执行完成.
2.创建多个队列,比如增加一个hive队列
十四.创建多队列的好处
增加Yarn容量调度器队列,也可以增加容量调度器的并发度.
按照计算引擎创建队列:hive spark flink
按照业务创建队列:下单 支付 点赞 评论 收藏(用户 活动等)
不创建多个队列时候,就是在一个default之中执行.
有什么好处?
(1)假如公司来了一个菜鸟,写了一个死循环,公司资源耗尽,大数据全部瘫痪.—解耦
(2)假如双11数据量是非常大的,任务是非常的多,如果所有的任务都参与执行,一定是执行不完的,怎么办?将任务分为几个优先级,—支持降级运行 下单(必须完成) – 支付(必须完成) – 点赞(不完成也是可以的) – 评论 – 收藏
十五.创建hive队列
同时要给队列增加如下新的一些属性:
这里的配置过程和文件之中default的默认配置是相同的,就是需要进行理解与消化.
最大容量的含义就是当default是不够的时候,需要使用hive进行.
将上述的文件进行改正与保存之后,进行同步的分发,重启yarn.有钱就是换64G2T的电脑,努力吧.(吐槽一下)
开启成功.再次进行执行如下的代码:
[atguigu@hadoop102 hadoop]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi 1 1
由调度起可以知道,没有给到hive的机会,因此,这里我们是要进行指定hive进行工作的.
[atguigu@hadoop102 hadoop]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi -Dmapreduce.job.queuename=hive 1 1
十六.datagrip工具安装
真无语,找了半天找不到破解文件,咋那莫多的钓鱼网站,浪费了好长时间.国内学术为啥那么不真诚……..
首先进入网站:其他版本 – DataGrip
安装破解:
链接:https://pan.baidu.com/s/1Z86cQkbN8Aref8hvsmG-GQ?pwd=1111
提取码:1111 —具体方法看里面的txt文件,比较好操作。
打开已经安装好的软件,将其进行连接hive。
上述之中在进行Test之前是需要进行连接hiveserver2的。
至少出现四条数据的时候,进行数据库的连接.
十七.ODS层用户行为日志(建表+加载数据)
要求:保持原有的数据,分区不发生改变,lzo压缩.
创建表
1.如果要创建的表已经存在,是要删除掉这个表;
drop table if exists ods_log;
2.创建外部表
什么时候创建外部表? 在数据仓库之中,绝大多数都是外部表,只有自己临时使用的表示内部表.
删除数据:元数据 原始数据, 内部表删除数据: 元数据 原始数据, 外部表删除数据:元数据
CREATE EXTERNAL TABLE ods_log(line string)
3.按照表的日期进行分区
PARTITIONED BY (dt
string)
4.LZO压缩格式处理
说明Hive的LZO压缩:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO
5.设置数据存储位置
LOCATION /warehouse/gmall/ods/ods_log
;
CREATE EXTERNAL TABLE ods_log (line
string)
PARTITIONED BY (dt
string) -- 按照时间创建分区
STORED AS
INPUTFORMAT
"com.hadoop.mapred.DeprecatedLzoTextInputFormat"
OUTPUTFORMAT
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
LOCATION
'/warehouse/gmall/ods/ods_log' -- 指定数据在hdfs上的存储位置
;
上面的代码我组合了好多次,都是存在问题的,因此,就看最后这一个黑框框之中的就可以.
加载数据
首先将之前改的压缩过程,重新变成lzop压缩,具体的过程见第六节。
如何生成相应的6月14号的数据呢?见下面的操作
我发现这个地方就是我的第二层flume没有进行配置,就是导致这个数据没有生成的原因.(先去配置第二层的flume)
这个地方调了一下午,最终第一层是没有问题了,开始调制第二层,就是配置了很多遍总数存在问题,但是最终终于出来了.(耗费了很长时间,里面我也不知道出现了什么错误,就是解决不了,只能是把环境删除之后,重新进行配置)
这个地方的东西就是我想要的数据,最后终于啊是出来了.(真的是捣鼓了半天)
数据加载成功.
加载脚本的定义:
$ vim hdfs_to_ods_log.sh
#!/bin/bash
定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
hadoop=/opt/module/hadoop-3.1.3/bin/hadoop
如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=date -d "-1 day" +%F
fi
echo ================== 日志日期为 $do_date ==================
sql="
load data inpath '/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log partition(dt='$do_date');
"
$hive -e "$sql"
$ chmod 777 hdfs_to_ods_log.sh
$ hdfs_to_ods_log.sh 2020-06-15
上面是进行将2020-06-15日的数据进行导入的过程,但是我不知道为什么有的时候会导入不成功需要重新导入.
十八.脚本之中单引号和双引号之间的区别
$ vim test.sh
#!/bin/bash
do_date=$1
echo '$do_date'
echo "$do_date"
echo "'$do_date'"
echo '"$do_date"'
echo date
(1)单引号不取变量值
(2)双引号取变量值
(3)反引号,执行引号中命令
(4)双引号内部嵌套单引号,取出变量值
(5)单引号内部嵌套双引号,不取出变量值</p>
<h2>十九.ODS_业务数据建表</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/72dbcc222d9a4119b1ea603ee8f5c783.png" /></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/068d9d37d3ae4b1d86ada3a35607be15.png" /></p>
<p>上述过程之中\t是不可以被省略的。
ODS层业务数据建表</p>
<pre><code>--ods层业务数据页面
--3.3.1 订单表(增量及更新)
--hive (gmall)>
drop table if exists ods_order_info;
create external table ods_order_info (
string COMMENT '订单号',
final_total_amount decimal(16,2) COMMENT '订单金额',
order_status string COMMENT '订单状态',
user_id string COMMENT '用户id',
out_trade_no string COMMENT '支付流水号',
create_time string COMMENT '创建时间',
operate_time string COMMENT '操作时间',
province_id string COMMENT '省份ID',
benefit_reduce_amount decimal(16,2) COMMENT '优惠金额',
original_total_amount decimal(16,2) COMMENT '原价金额',
feight_fee decimal(16,2) COMMENT '运费'
) COMMENT '订单表'
PARTITIONED BY (
string) -- 按照时间创建分区
row format delimited fields terminated by '\t' -- 指定分割符为\t
STORED AS -- 指定存储方式,读数据采用LzoTextInputFormat;输出数据采用TextOutputFormat
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_info/' -- 指定数据在hdfs上的存储位置
;
--3.3.2 订单详情表(增量)
--hive (gmall)>
drop table if exists ods_order_detail;
create external table ods_order_detail(
string COMMENT '编号',
order_id string COMMENT '订单号',
user_id string COMMENT '用户id',
sku_id string COMMENT '商品id',
sku_name string COMMENT '商品名称',
order_price decimal(16,2) COMMENT '商品价格',
sku_num bigint COMMENT '商品数量',
create_time string COMMENT '创建时间',
source_type string COMMENT '来源类型',
source_id string COMMENT '来源编号'
) COMMENT '订单详情表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_detail/';
--3.3.3 SKU商品表(全量)
--hive (gmall)>
drop table if exists ods_sku_info;
create external table ods_sku_info(
string COMMENT 'skuId',
spu_id string COMMENT 'spuid',
decimal(16,2) COMMENT '价格',
sku_name string COMMENT '商品名称',
sku_desc string COMMENT '商品描述',
string COMMENT '重量',
tm_id string COMMENT '品牌id',
category3_id string COMMENT '品类id',
create_time string COMMENT '创建时间'
) COMMENT 'SKU商品表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_sku_info/';
--3.3.4 用户表(增量及更新)
--hive (gmall)>
drop table if exists ods_user_info;
create external table ods_user_info(
string COMMENT '用户id',
string COMMENT '姓名',
string COMMENT '生日',
string COMMENT '性别',
string COMMENT '邮箱',
user_level string COMMENT '用户等级',
create_time string COMMENT '创建时间',
operate_time string COMMENT '操作时间'
) COMMENT '用户表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_user_info/';
--3.3.5 商品一级分类表(全量)
--hive (gmall)>
drop table if exists ods_base_category1;
create external table ods_base_category1(
string COMMENT 'id',
string COMMENT '名称'
) COMMENT '商品一级分类表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_category1/';
--3.3.6 商品二级分类表(全量)
--hive (gmall)>
drop table if exists ods_base_category2;
create external table ods_base_category2(
string COMMENT ' id',
string COMMENT '名称',
category1_id string COMMENT '一级品类id'
) COMMENT '商品二级分类表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_category2/';
--3.3.7 商品三级分类表(全量)
--hive (gmall)>
drop table if exists ods_base_category3;
create external table ods_base_category3(
string COMMENT ' id',
string COMMENT '名称',
category2_id string COMMENT '二级品类id'
) COMMENT '商品三级分类表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_category3/';
--3.3.8 支付流水表(增量)
--hive (gmall)>
drop table if exists ods_payment_info;
create external table ods_payment_info(
bigint COMMENT '编号',
out_trade_no string COMMENT '对外业务编号',
order_id string COMMENT '订单编号',
user_id string COMMENT '用户编号',
alipay_trade_no string COMMENT '支付宝交易流水编号',
total_amount decimal(16,2) COMMENT '支付金额',
string COMMENT '交易内容',
payment_type string COMMENT '支付类型',
payment_time string COMMENT '支付时间'
) COMMENT '支付流水表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_payment_info/';
--3.3.9 省份表(特殊)
--hive (gmall)>
drop table if exists ods_base_province;
create external table ods_base_province (
bigint COMMENT '编号',
string COMMENT '省份名称',
region_id string COMMENT '地区ID',
area_code string COMMENT '地区编码',
iso_code string COMMENT 'iso编码,superset可视化使用'
) COMMENT '省份表'
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_province/';
--3.3.10 地区表(特殊)
--hive (gmall)>
drop table if exists ods_base_region;
create external table ods_base_region (
string COMMENT '编号',
region_name string COMMENT '地区名称'
) COMMENT '地区表'
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_region/';
--3.3.11 品牌表(全量)
--hive (gmall)>
drop table if exists ods_base_trademark;
create external table ods_base_trademark (
tm_id string COMMENT '编号',
tm_name string COMMENT '品牌名称'
) COMMENT '品牌表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_trademark/';
--3.3.12 订单状态表(增量)
--hive (gmall)>
drop table if exists ods_order_status_log;
create external table ods_order_status_log (
string COMMENT '编号',
order_id string COMMENT '订单ID',
order_status string COMMENT '订单状态',
operate_time string COMMENT '修改时间'
) COMMENT '订单状态表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_status_log/';
--3.3.13 SPU商品表(全量)
--hive (gmall)>
drop table if exists ods_spu_info;
create external table ods_spu_info(
string COMMENT 'spuid',
spu_name string COMMENT 'spu名称',
category3_id string COMMENT '品类id',
tm_id string COMMENT '品牌id'
) COMMENT 'SPU商品表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_spu_info/';
--3.3.14 商品评论表(增量)
--hive (gmall)>
drop table if exists ods_comment_info;
create external table ods_comment_info(
string COMMENT '编号',
user_id string COMMENT '用户ID',
sku_id string COMMENT '商品sku',
spu_id string COMMENT '商品spu',
order_id string COMMENT '订单ID',
string COMMENT '评价',
create_time string COMMENT '评价时间'
) COMMENT '商品评论表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_comment_info/';
--3.3.15 退单表(增量)
--hive (gmall)>
drop table if exists ods_order_refund_info;
create external table ods_order_refund_info(
string COMMENT '编号',
user_id string COMMENT '用户ID',
order_id string COMMENT '订单ID',
sku_id string COMMENT '商品ID',
refund_type string COMMENT '退款类型',
refund_num bigint COMMENT '退款件数',
refund_amount decimal(16,2) COMMENT '退款金额',
refund_reason_type string COMMENT '退款原因类型',
create_time string COMMENT '退款时间'
) COMMENT '退单表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_order_refund_info/';
--3.3.16 加购表(全量)
--hive (gmall)>
drop table if exists ods_cart_info;
create external table ods_cart_info(
string COMMENT '编号',
user_id string COMMENT '用户id',
sku_id string COMMENT 'skuid',
cart_price decimal(16,2) COMMENT '放入购物车时价格',
sku_num bigint COMMENT '数量',
sku_name string COMMENT 'sku名称 (冗余)',
create_time string COMMENT '创建时间',
operate_time string COMMENT '修改时间',
is_ordered string COMMENT '是否已经下单',
order_time string COMMENT '下单时间',
source_type string COMMENT '来源类型',
source_id string COMMENT '来源编号'
) COMMENT '加购表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_cart_info/';
--3.3.17 商品收藏表(全量)
--hive (gmall)>
drop table if exists ods_favor_info;
create external table ods_favor_info(
string COMMENT '编号',
user_id string COMMENT '用户id',
sku_id string COMMENT 'skuid',
spu_id string COMMENT 'spuid',
is_cancel string COMMENT '是否取消',
create_time string COMMENT '收藏时间',
cancel_time string COMMENT '取消时间'
) COMMENT '商品收藏表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_favor_info/';
--3.3.18 优惠券领用表(新增及变化)
--hive (gmall)>
drop table if exists ods_coupon_use;
create external table ods_coupon_use(
string COMMENT '编号',
coupon_id string COMMENT '优惠券ID',
user_id string COMMENT 'skuid',
order_id string COMMENT 'spuid',
coupon_status string COMMENT '优惠券状态',
get_time string COMMENT '领取时间',
using_time string COMMENT '使用时间(下单)',
used_time string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_coupon_use/';
--3.3.19 优惠券表(全量)
--hive (gmall)>
drop table if exists ods_coupon_info;
create external table ods_coupon_info(
string COMMENT '购物券编号',
coupon_name string COMMENT '购物券名称',
coupon_type string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
condition_amount decimal(16,2) COMMENT '满额数',
condition_num bigint COMMENT '满件数',
activity_id string COMMENT '活动编号',
benefit_amount decimal(16,2) COMMENT '减金额',
benefit_discount decimal(16,2) COMMENT '折扣',
create_time string COMMENT '创建时间',
range_type string COMMENT '范围类型 1、商品 2、品类 3、品牌',
spu_id string COMMENT '商品id',
tm_id string COMMENT '品牌id',
category3_id string COMMENT '品类id',
limit_num bigint COMMENT '最多领用次数',
operate_time string COMMENT '修改时间',
expire_time string COMMENT '过期时间'
) COMMENT '优惠券表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_coupon_info/';
--3.3.20 活动表(全量)
--hive (gmall)>
drop table if exists ods_activity_info;
create external table ods_activity_info(
string COMMENT '编号',
activity_name string COMMENT '活动名称',
activity_type string COMMENT '活动类型',
start_time string COMMENT '开始时间',
end_time string COMMENT '结束时间',
create_time string COMMENT '创建时间'
) COMMENT '活动表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_activity_info/';
--3.3.21 活动订单关联表(增量)
--hive (gmall)>
drop table if exists ods_activity_order;
create external table ods_activity_order(
string COMMENT '编号',
activity_id string COMMENT '优惠券ID',
order_id string COMMENT 'skuid',
create_time string COMMENT '领取时间'
) COMMENT '活动订单关联表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_activity_order/';
--3.3.22 优惠规则表(全量)
--hive (gmall)>
drop table if exists ods_activity_rule;
create external table ods_activity_rule(
string COMMENT '编号',
activity_id string COMMENT '活动ID',
condition_amount decimal(16,2) COMMENT '满减金额',
condition_num bigint COMMENT '满减件数',
benefit_amount decimal(16,2) COMMENT '优惠金额',
benefit_discount decimal(16,2) COMMENT '优惠折扣',
benefit_level string COMMENT '优惠级别'
) COMMENT '优惠规则表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_activity_rule/';
--3.3.23 编码字典表(全量)
--hive (gmall)>
drop table if exists ods_base_dic;
create external table ods_base_dic(
dic_code string COMMENT '编号',
dic_name string COMMENT '编码名称',
parent_code string COMMENT '父编码',
create_time string COMMENT '创建日期',
operate_time string COMMENT '操作日期'
) COMMENT '编码字典表'
PARTITIONED BY (
string)
row format delimited fields terminated by '\t'
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods/ods_base_dic/';
</code></pre>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/c67f1b908edc4b48b495315f05a060f1.png" /></p>
<p>加载完成。</p>
<h2>二十.ODS_业务数据脚本</h2>
<p>$ vim hdfs_to_ods_db.sh</p>
<pre><code>#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=
-d "-1 day" +%F
fi
sql1="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_order/$do_date' OVERWRITE into table ${APP}.ods_activity_order partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date');
"
sql2="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;
"
case $1 in
"first"){
$hive -e "$sql1$sql2"
};;
"all"){
$hive -e "$sql1"
};;
esac
</code></pre>
<p>注意上面的sql2,它是一般只加载一次的.</p>
<p>1)初次导入</p>
<p>初次导入时,脚本的第一个参数应为first,线上环境不传第二个参数,自动获取前一天日期
$ hdfs_to_ods_db.sh first2020-06-14</p>
<p>2)每日导入</p>
<p>每日重复导入,脚本的第一个参数应为all,线上环境不传第二个参数,自动获取前一天日期。
$ hdfs_to_ods_db.sh all2020-06-15</p>
<p>spark3.0.0 => hive2.3.7 =>hadoop2.0
hive3.1.2 => spark2.4.5(不行,改为spark3.0-纯净版) =>hadoop3.0以上,hive和hadoop对应</p>
<p>要使用纯净版的spark,为什么还要上传非纯净版的spark?</p>
<p>纯净版的spark用来放到yarn上面进行计算.</p>
<p>非纯净版的spark,用来hive通信,创建spark session会话.</p>
<h2>二十一.ODS_索引问题</h2>
<p>在原来的hdfs_***_log.sh后面增加如下所示:</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/22441f45e1394575b3a16e851900933a.png" />
$ [hadoop-3.1.3/bin]hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=hive /warehouse/gmall/ods/ods_log/dt=2020-06-14
$ [hadoop-3.1.3/bin]hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=hive /warehouse/gmall/ods/ods_log/dt=2020-06-15
上面是进行创建6-14和6-15的索引.</p>
<h2>二十二.DWD层_get_json_object --- 工薪阶层</h2>
<p>1)对用户行为数据解析。
2)对核心数据进行判空过滤。
3)对业务数据采用维度模型重新建模。</p>
<p>页面埋点数据:
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/bd82b2a761224fb190db0c851660790b.png" /></p>
<p>埋点:
在企业中页面里非常多:(中大型公司)动作 曝光 错误 页面 公共信息
中小型公司(页面比较少):一个页面一张表 (商品列表 商品点击 广告 故障 后台活跃 通知 点赞 评论 收藏)</p>
<p>启动日志:
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/52cd4ef34ad247b589c11c079299f514.png" /></p>
<p>通过启动日志查看用户是否活跃.</p>
<p>1)数据</p>
<p>[{"name":"姚大郎","sex":"男","age":"25"},{"name":"西门庆","sex":"男","age":"47"}]</p>
<p>2)取出第一个json对象</p>
<p>hive (gmall)></p>
<p>select get_json_object('[{"name":"姚大郎","sex":"男","age":"25"},{"name":"西门庆","sex":"男","age":"47"}]','$[0]'); ---第一个是代表的是要进行处理的对象,$[0]代表是全部对象</p>
<p>结果是:{"name":"姚大郎","sex":"男","age":"25"}</p>
<p>3)取出第一个json的age字段的值</p>
<p>hive (gmall)></p>
<p>SELECT get_json_object('[{"name":"姚大郎","sex":"男","age":"25"},{"name":"西门庆","sex":"男","age":"47"}]',"$[0].age ");----获取值</p>
<p>结果是:2 5
英年早逝的姚大郎同志,哈哈哈哈哈哈哈哈哈.</p>
<p>运行示意图:</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/211846595c524b02b999567e021afc5f.png" /></p>
<p>上面的这个函数的使用是非常重要的.</p>
<h2>二十三.创建启动日志</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/4eaf8d4c2e6f4e93a73ac23df92dbb4a.png" /></p>
<p>上面的启动日志和事件日志都是存在与ods_log之中.将上面之中的这两个日志进行相应的分离.</p>
<p>hive-gmall操作</p>
<pre><code>drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
area_code string COMMENT '地区编码',
string COMMENT '手机品牌',
string COMMENT '渠道',
string COMMENT '手机型号',
mid_id string COMMENT '设备id',
string COMMENT '操作系统',
user_id string COMMENT '会员id',
version_code string COMMENT 'app版本号',
string COMMENT ' icon手机图标 notice 通知 install 安装后启动',
loading_time bigint COMMENT '启动加载时间',
open_ad_id string COMMENT '广告页ID ',
open_ad_ms bigint COMMENT '广告总共播放时间',
open_ad_skip_ms bigint COMMENT '用户跳过广告时点',
bigint COMMENT '时间'
) COMMENT '启动日志表'
PARTITIONED BY (dt string) -- 按照时间创建分区
stored as parquet -- 采用parquet列式存储
LOCATION '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
TBLPROPERTIES('parquet.compression'='lzo') -- 采用LZO压缩
;
</code></pre>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/374a903d94b24c009da5c7614d6a63ca.png" /></p>
<p>parque进行列式存储:
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/edab0938ac7349a780f03e5812ac2b9b.png" /></p>
<p>在进行查询的时候,select name from user(查询速度快);列式存储配合压缩,压缩比会更小,减少磁盘使用空间.</p>
<p>说明:数据采用parquet存储方式,是可以支持切片的,不需要再对数据创建索引。如果单纯的text方式存储数据,需要采用支持切片的,lzo p 压缩方式并创建索引。</p>
<h2>二十四.加载数据</h2>
<p>hive</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_start_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.start') is not null;
</code></pre>
<p>hive (gmall)> select * from dwd_start_log where dt='2020-06-14' limit 2;<img alt="" src="https://img-blog.csdnimg.cn/8ed45a5a96644a32b131e9107606736d.png" /></p>
<p>执行是没有问题的.</p>
<h2>二十五.hive读取文件</h2>
<p>select * from ods_log;---不执行MR</p>
<p>select count(*) from ods_log;---执行MR</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/acbdb498f9184847a8311e741fdcffda.png" />
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/2c1139447c694963ba162f2c12caad97.png" /></p>
<p>原因是select * from ods_log不执行MR操作,默认采用的是ods_log建表语句中指定的DeprecatedLzoTextInputFormat,能够识别lzo.index为索引文件。</p>
<p>select count(*) from ods_log 执行MR操作,默认采用的是CombineHiveInputFormat ,不能识别lzo .index 为索引文件,将索引文件当做普通文件处理。更严重的是,这会导致LZO文件无法切片。</p>
<p><strong>解决办法:修改</strong>CombineHiveInputFormat 为HiveInputFormat</p>
<p><strong>再次测试</strong></p>
<p>hive (gmall)></p>
<p>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;</p>
<p>hive (gmall)> select * from ods_log;</p>
<p>Time taken: 0.706 seconds, Fetched:2955 row(s)</p>
<p>hive (gmall)> select count(*) from ods_log;</p>
<p>2955</p>
<h2>二十六.页面日志解析</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/fd9bf1e05d064bd4a1bae243d64f15d8.png" /></p>
<p>1)进行建表</p>
<pre><code>hive (gmall)>
drop table if exists dwd_page_log;
CREATE EXTERNAL TABLE dwd_page_log(
area_code string COMMENT '地区编码',
string COMMENT '手机品牌',
string COMMENT '渠道',
string COMMENT '手机型号',
mid_id string COMMENT '设备id',
string COMMENT '操作系统',
user_id string COMMENT '会员id',
version_code string COMMENT 'app版本号',
during_time bigint COMMENT '持续时间毫秒',
page_item string COMMENT '目标id ',
page_item_type string COMMENT '目标类型',
last_page_id string COMMENT '上页类型',
page_id string COMMENT '页面ID ',
source_type string COMMENT '来源类型',
bigint
) COMMENT '页面日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_page_log'
TBLPROPERTIES('parquet.compression'='lzo');
</code></pre>
<p>2)数据导入</p>
<pre><code>insert overwrite table dwd_page_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.page') is not null;
</code></pre>
<p>3)查看数据</p>
<pre><code>hive (gmall)>
select * from dwd_page_log where dt='2020-06-14' limit 2;
</code></pre>
<h2>二十七.动作日志建表</h2>
<p><strong>动作日志解析思路:动作日志表中每行数据对应用户的一个动作记录,一个动作记录应当包含公共信息、页面信息以及动作信息。先将包含action字段的日志过滤出来,然后通过UDTF函数,将action数组"炸开"(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。</strong></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/1726cd55fb15423597569fab075ef31f.png" /></p>
<p><strong>1)建表语句</strong></p>
<pre><code>hive (gmall)>
drop table if exists dwd_action_log;
CREATE EXTERNAL TABLE dwd_action_log(
area_code string COMMENT '地区编码',
string COMMENT '手机品牌',
string COMMENT '渠道',
string COMMENT '手机型号',
mid_id string COMMENT '设备id',
string COMMENT '操作系统',
user_id string COMMENT '会员id',
version_code string COMMENT 'app版本号',
during_time bigint COMMENT '持续时间毫秒',
page_item string COMMENT '目标id ',
page_item_type string COMMENT '目标类型',
last_page_id string COMMENT '上页类型',
page_id string COMMENT '页面id ',
source_type string COMMENT '来源类型',
action_id string COMMENT '动作id',
string COMMENT '目标id ',
item_type string COMMENT '目标类型',
bigint COMMENT '时间'
) COMMENT '动作日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_action_log'
TBLPROPERTIES('parquet.compression'='lzo');
</code></pre>
<h2>二十八.UDTF函数思想</h2>
<p><strong>2)创建UDTF函数——设计思路(下面的这张表特别需要进行注意)</strong>
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/ecd9143e01d14daba6ac13ee24743155.png" /></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/14ecb3f5e3b5417aba6cc36bb01b4173.png" /></p>
<p><strong>3</strong> <strong>)创建UDTF函数——编写代码</strong></p>
<p><strong>(1)创建</strong> <strong>一个maven工程:hivefunction</strong></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/0184813fb80d4c4d83f5f4368a71d233.png" /></p>
<p>电脑炸了!</p>
<p><strong>(2)创建</strong> <strong>包名:com.atguigu.hive</strong> <strong>.</strong> <strong>udtf</strong></p>
<p><strong>(</strong> <strong>3</strong> <strong>)引入如下依赖</strong></p>
<pre><code><dependencies>
<!--添加hive依赖-->
<dependency>
<groupid>org.apache.hive</groupid>
<artifactid>hive-exec</artifactid>
<version>3.1.2</version>
</dependency>
</dependencies>
</code></pre>
<p><strong>(</strong> <strong>4</strong> <strong>)编码</strong></p>
<pre><code class="language-java">package com.atguigu.hive.udtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import java.util.ArrayList;
import java.util.List;
public class ExplodeJSONArray extends GenericUDTF {
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1 参数合法性检查:如果要是传入的参数不是一个的话,抛出一个异常 action
if (argOIs.getAllStructFieldRefs().size() != 1){
throw new UDFArgumentException("ExplodeJSONArray 只需要一个参数");
}
// 2 第一个参数必须为string
if(!"string".equals(argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName())){
throw new UDFArgumentException("json_array_to_struct_array的第1个参数应为string类型");
}
// 3 定义返回值名称和类型
List fieldNames = new ArrayList();
List fieldOIs = new ArrayList();
fieldNames.add("items");//这个名字可以随便起
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);//校验上面的名称,判断其是不是string类型
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
public void process(Object[] objects) throws HiveException {
//object=>aaction => 多进多出(多行进多行出)
// 1 获取传入的数据
String jsonArray = objects[0].toString();
// 2 将string转换为json数组
JSONArray actions = new JSONArray(jsonArray);
// 3 循环一次,取出数组中的一个json,并写出----就是将JSON之中的东西依次进行导出的过程.进行循环遍历,一次一次进行相应的写出
for (int i = 0; i < actions.length(); i++) {
String[] result = new String[1];
result[0] = actions.getString(i);
forward(result);
}
}
public void close() throws HiveException {
}
}
</code></pre>
<h2>二十九.DWD_创建永久UDTF函数</h2>
<p><strong>4</strong> <strong>)创建函数</strong></p>
<p>(1)打包(package)
将上面的包进行右键,Show in Explorer.</p>
<p>(2)将hivefunction-1.0-SNAPSHOT.jar上传到hadoop102的/opt/module,然后再将该jar包上传到HDFS的/user/hive/jars路径下
[atguigu@hadoop102 module]$ hadoop fs -mkdir -p /user/hive/jars
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/7e6f14472094411a910812b2e8b05ec0.png" /></p>
<p>上传成功.</p>
<p>(3)创建永久函数与开发好的java class关联---下面的explode_json_array是相应的一个函数</p>
<p>hive (gmall)></p>
<p>create function <strong>explode_json_array</strong>as 'com.atguigu.hive.udtf.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020 /user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';</p>
<p>(4)注意:如果修改了自定义函数重新生成jar包怎么处理?只需要替换HDFS路径上的旧jar包,然后重启Hive客户端即可。</p>
<h2>三十.DWD_动作日志解析完成</h2>
<p><strong>5</strong> <strong>)数据导入</strong></p>
<p>hive (gmall)></p>
<pre><code class="language-java">SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_action_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ods_log lateral view explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='2020-06-14'
and get_json_object(line,'$.actions') is not null;
</code></pre>
<p><strong>3</strong> <strong>)查看数据</strong></p>
<p>hive (gmall)></p>
<p>select * from dwd_action_log where dt='2020-06-14' limit 2;</p>
<h2>三十一.DWD_曝光日志</h2>
<p><strong>曝光日志解析思路:</strong>曝光日志表中每行数据对应一个曝光记录,一个曝光记录应当包含公共信息、页面信息以及曝光信息。先将包含display字段的日志过滤出来,然后通过UDTF函数,将display数组"炸开"(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/600324d309f74d9dbe9bdfb33010df82.png" /></p>
<p><strong>1)建表语句</strong></p>
<pre><code class="language-java">hive (gmall)>
drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
area_code string COMMENT '地区编码',
string COMMENT '手机品牌',
string COMMENT '渠道',
string COMMENT '手机型号',
mid_id string COMMENT '设备id',
string COMMENT '操作系统',
user_id string COMMENT '会员id',
version_code string COMMENT 'app版本号',
during_time bigint COMMENT 'app版本号',
page_item string COMMENT '目标id ',
page_item_type string COMMENT '目标类型',
last_page_id string COMMENT '上页类型',
page_id string COMMENT '页面ID ',
source_type string COMMENT '来源类型',
bigint COMMENT 'app版本号',
display_type string COMMENT '曝光类型',
string COMMENT '曝光对象id ',
item_type string COMMENT 'app版本号',
bigint COMMENT '出现顺序'
) COMMENT '曝光日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_display_log'
TBLPROPERTIES('parquet.compression'='lzo');
</code></pre>
<p>2)数据导入</p>
<pre><code class="language-java">SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_display_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts'),
get_json_object(display,'$.displayType'),
get_json_object(display,'$.item'),
get_json_object(display,'$.item_type'),
get_json_object(display,'$.order')
from ods_log lateral view explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='2020-06-14'
and get_json_object(line,'$.displays') is not null;
</code></pre>
<p><strong>3)查看数据</strong></p>
<p>hive (gmall)> select * from dwd_display_log where dt='2020-06-14' limit 2;</p>
<h2>三十二.页面信息解释</h2>
<p>比如说:我们进入一个页面点击了支付,在另一个页面也点击了支付,如何查看在哪个页面进行了点击</p>
<h2>三十三.错误日志表分析</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/adae6e1c15e3433ebc8c8d9a21759860.png" /></p>
<p>错误日志解析思路:错误日志表中每行数据对应一个错误记录,为方便定位错误,一个错误记录应当包含与之对应的公共信息、页面信息、曝光信息、动作信息、启动信息以及错误信息。先将包含err字段的日志过滤出来,然后使用get_json_object函数解析所有字段。</p>
<p><strong>1)建表语句</strong></p>
<pre><code class="language-java">hive (gmall)>
drop table if exists dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
area_code string COMMENT '地区编码',
string COMMENT '手机品牌',
string COMMENT '渠道',
string COMMENT '手机型号',
mid_id string COMMENT '设备id',
string COMMENT '操作系统',
user_id string COMMENT '会员id',
version_code string COMMENT 'app版本号',
page_item string COMMENT '目标id ',
page_item_type string COMMENT '目标类型',
last_page_id string COMMENT '上页类型',
page_id string COMMENT '页面ID ',
source_type string COMMENT '来源类型',
string COMMENT ' icon手机图标 notice 通知 install 安装后启动',
loading_time string COMMENT '启动加载时间',
open_ad_id string COMMENT '广告页ID ',
open_ad_ms string COMMENT '广告总共播放时间',
open_ad_skip_ms string COMMENT '用户跳过广告时点',
string COMMENT '动作',
string COMMENT '曝光',
string COMMENT '时间',
error_code string COMMENT '错误码',
string COMMENT '错误信息'
) COMMENT '错误日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_error_log'
TBLPROPERTIES('parquet.compression'='lzo');
</code></pre>
<p>说明:此处为对动作数组和曝光数组做处理,如需分析错误与单个动作或曝光的关联,可先使用explode_json_array函数将数组"炸开",再使用get_json_object函数获取具体字段。</p>
<p><strong>2)数据导入</strong></p>
<pre><code class="language-java">SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_error_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.err') is not null;
</code></pre>
<p><strong>3)查看数据</strong></p>
<p>hive (gmall)>
select * from dwd_error_log where dt='2020-06-14' limit 2;</p>
<h4>DWD层用户行为数据加载脚本</h4>
<p>1)在hadoop102的/home/atguigu/bin目录下创建脚本
[atguigu@hadoop102 bin]$ vim ods_to_dwd_log.sh</p>
<pre><code class="language-java">#!/bin/bash
hive=/opt/module/hive/bin/hive
APP=gmall
如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=
-d "-1 day" +%F
fi
sql="
SET mapreduce.job.queuename=hive;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_start_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.start') is not null;
insert overwrite table ${APP}.dwd_action_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='$do_date'
and get_json_object(line,'$.actions') is not null;
insert overwrite table ${APP}.dwd_display_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts'),
get_json_object(display,'$.displayType'),
get_json_object(display,'$.item'),
get_json_object(display,'$.item_type'),
get_json_object(display,'$.order')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='$do_date'
and get_json_object(line,'$.displays') is not null;
insert overwrite table ${APP}.dwd_page_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;
insert overwrite table ${APP}.dwd_error_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.err') is not null;
"
$hive -e "$sql"
</code></pre>
<p>2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 ods_to_dwd_log.sh</p>
<p>3)脚本使用
[atguigu@hadoop102 module]$ ods_to_dwd_log.sh 2020-06-15</p>
<p>4)查询导入结果</p>
<p>hive (gmall)></p>
<p>select * from dwd_start_log where dt='2020-06-15' limit 2;</p>
<p>5)脚本执行时间</p>
<p>企业开发中一般在每日凌晨30分~1点</p>
<h2>三十四.DWD_商品维度解析</h2>
<p>业务数据方面DWD层的搭建主要注意点在于维度的退化,减少后续大量Join操作。
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/0f9fb1ffdc954dc68986fa80f6c89a06.png" /></p>
<p><strong>商品维度表</strong></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/53929c1771ad4811b0476029ba562e88.png" /></p>
<p>1)建表语句</p>
<pre><code class="language-java">DROP TABLE IF EXISTS
dwd_dim_sku_info;
CREATE EXTERNAL TABLE
dwd_dim_sku_info (
string COMMENT '商品id',
spu_id string COMMENT 'spuid',
decimal(16,2) COMMENT '商品价格',
sku_name string COMMENT '商品名称',
sku_desc string COMMENT '商品描述',
decimal(16,2) COMMENT '重量',
tm_id string COMMENT '品牌id',
tm_name string COMMENT '品牌名称',
category3_id string COMMENT '三级分类id',
category2_id string COMMENT '二级分类id',
category1_id string COMMENT '一级分类id',
category3_name string COMMENT '三级分类名称',
category2_name string COMMENT '二级分类名称',
category1_name string COMMENT '一级分类名称',
spu_name string COMMENT 'spu名称',
create_time string COMMENT '创建时间'
) COMMENT '商品维度表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_sku_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code class="language-java">SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_sku_info partition(dt='2020-06-14')
select
sku.id, --- 通过下面起别名的方式取出id
sku.spu_id, --- 通过下面起别名的方式取出spu_id
sku.price,
sku.sku_name,
sku.sku_desc, --- 描述
sku.weight, --- 重量
sku.tm_id, --- 品牌id
ob.tm_name, --- 品牌名称
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
sku.create_time
from
(
select * from ods_sku_info where dt='2020-06-14'
)sku --- 以sku为主,sku是个别名
join
(
select * from ods_base_trademark where dt='2020-06-14'
)ob on sku.tm_id=ob.tm_id 后面就是一个进行join的过程,需要谁就是与谁进行join,ob是一个别名
join
(
select * from ods_spu_info where dt='2020-06-14'
)spu on spu.id = sku.spu_id 需要谁和谁进行join
join
(
select * from ods_base_category3 where dt='2020-06-14'
)c3 on sku.category3_id=c3.id
join
(
select * from ods_base_category2 where dt='2020-06-14'
)c2 on c3.category2_id=c2.id
join
(
select * from ods_base_category1 where dt='2020-06-14'
)c1 on c2.category1_id=c1.id;
</code></pre>
<p>3)查询加载结果
hive (gmall)> select * from dwd_dim_sku_info where dt='2020-06-14' limit 2;</p>
<h2>三十五.DWD_优惠券维度表</h2>
<p>把ODS层ods_coupon_info表数据导入到DWD层优惠卷维度表,在导入过程中可以做适当的清洗。</p>
<p>1)建表语句</p>
<pre><code class="language-java">drop table if exists dwd_dim_coupon_info;
create external table dwd_dim_coupon_info(
string COMMENT '购物券编号',
coupon_name string COMMENT '购物券名称',
coupon_type string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
condition_amount decimal(16,2) COMMENT '满额数',
condition_num bigint COMMENT '满件数',
activity_id string COMMENT '活动编号',
benefit_amount decimal(16,2) COMMENT '减金额',
benefit_discount decimal(16,2) COMMENT '折扣',
create_time string COMMENT '创建时间',
range_type string COMMENT '范围类型 1、商品 2、品类 3、品牌',
spu_id string COMMENT '商品id',
tm_id string COMMENT '品牌id',
category3_id string COMMENT '品类id',
limit_num bigint COMMENT '最多领用次数',
operate_time string COMMENT '修改时间',
expire_time string COMMENT '过期时间'
) COMMENT '优惠券维度表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_coupon_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code class="language-java">SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_coupon_info partition(dt='2020-06-14')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ods_coupon_info
where dt='2020-06-14';
</code></pre>
<p>3)查询加载结果
hive (gmall)> select * from dwd_dim_coupon_info where dt='2020-06-14' limit 2;</p>
<h2>三十六.活动维度表(全量)</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/5576dbacda6543859c0b1c7b636a48f3.png" /></p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_dim_activity_info;
create external table dwd_dim_activity_info(
string COMMENT '编号',
activity_name string COMMENT '活动名称',
activity_type string COMMENT '活动类型',
start_time string COMMENT '开始时间',
end_time string COMMENT '结束时间',
create_time string COMMENT '创建时间'
) COMMENT '活动信息表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_activity_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_activity_info partition(dt='2020-06-14')
select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from ods_activity_info
where dt='2020-06-14';
</code></pre>
<p>3)查询结果</p>
<p>select * from dwd_dim_activity_info where dt='2020-06-14' limit 2</p>
<p>自定义UDTF函数的一个好处,为什么不使用系统的炸裂函数?</p>
<p>理由:自定义UDTF函数的好处是可以快速定义到函数的问题.更加的灵活,可以快速定义到问题.</p>
<p>脚本之中进行执行的时候,自定义函数前面也是需要增加库名的,第一遍自己写没有添加会导致执行不下去.</p>
<p>自定义函数声明问题:自定义函数是以库为单位,并不是全局的,不能够跨越库去执行相应的函数.</p>
<p>自定义UDTF函数的步骤:
(1)定义一个类,继承G..UDTF
(2)重写里面的三个方法,初始化(输入参数的个数 和类型校验 定义返回值名称 校验返回值类型)
process(支持多进多出,本次项目是用的是=>1进多出 object[0].toString forward)
close
(3)打包+上传(hdfs集群路径)
(4)在hive客户端进行注册gmall->create function ex_json_array as "全类名" using "hdfs://hadoop"</p>
<p>stored as parquet + lzo 是支持切片的</p>
<h2>三十七.DWD_业务_维度退化</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/fee23e22859c44d085878e7e7ceb1a1b.png" /></p>
<p>进行相应的join操作,进而得到相应的维度退化过程.(面试:对于哪些表进行了维度退化)
省份表和地区表-------地区表
商品表 spu 品类表 三级分类 二级分类 一级分类------商品维度表
时间 (年 月 日)-------时间维度表</p>
<h2>三十八.DWD_业务_ETL清洗</h2>
<p>ETL内容:(1)数据清洗,就是进行相应的数据解析; (2)核心字段不能为空.(3)超时信息进行过滤.(4)重复数据进行过滤.(5)核心字段错误,进行过滤.</p>
<p>清洗手段:hql,spark sql,python,MR,Kettle(深圳比较多,北京是比较少的)</p>
<p>来了10000条日志,这里需要进行清洗的数据是多少. 10000条数据算正常,但是一旦超过了10000条,找谁</p>
<p>用户行为数据(点赞) 业务数据(点赞) 当他们都是存在的时候,
1.正常情况下我们是使用业务数据的.(优先选择业务)
2.拼接:业务 | 用户行为
有业务用业务,但是当业务Hi空的时候用用户行为的.</p>
<h2>三十九.DWD_业务_活动维度</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/5a9dc5eede71478a8e89f96fc28253f3.png" /></p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_dim_activity_info;
create external table dwd_dim_activity_info(
string COMMENT '编号',
activity_name string COMMENT '活动名称',
activity_type string COMMENT '活动类型',
start_time string COMMENT '开始时间',
end_time string COMMENT '结束时间',
create_time string COMMENT '创建时间'
) COMMENT '活动信息表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_activity_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2) 数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_activity_info partition(dt='2020-06-14')
select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from ods_activity_info
where dt='2020-06-14';
</code></pre>
<p>3)查询加载结果</p>
<p>hive (gmall)> select * from dwd_dim_activity_info where dt='2020-06-14' limit 2;</p>
<h2>四十.DWD_业务_地区维度</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/14dc9d3f044246e1a79740fe3ba7a3a8.png" /></p>
<p>1)创建表</p>
<pre><code>DROP TABLE IF EXISTS
dwd_dim_base_province;
CREATE EXTERNAL TABLE
dwd_dim_base_province (
string COMMENT 'id',
province_name string COMMENT '省市名称',
area_code string COMMENT '地区编码',
iso_code string COMMENT 'ISO编码',
region_id string COMMENT '地区id',
region_name string COMMENT '地区名称'
) COMMENT '地区维度表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_base_province/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from
(
select * from ods_base_province
) bp
join
(
select * from ods_base_region
) br
on bp.region_id = br.id;
</code></pre>
<p>3)查询加载结果
select * from dwd_dim_base_province limit 2;</p>
<h2>四十一.DWD_业务_时间维度</h2>
<p>1)建表语句</p>
<pre><code>DROP TABLE IF EXISTS
dwd_dim_date_info;
CREATE EXTERNAL TABLE
dwd_dim_date_info(
date_id string ,
week_id string ,
week_day string ,
string ,
string ,
string ,
string ,
is_workday string ,
holiday_id string
)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_date_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)把date_info.txt文件上传到hadoop102的/opt/module/db_log/路径</p>
<p>3)数据装载</p>
<p>注意:由于dwd_dim_date_info是列式存储+LZO压缩。直接将date_info.txt文件导入到目标表,并不会直接转换为列式存储+LZO压缩。我们需要创建一张普通的临时表dwd_dim_date_info_tmp,将date_info.txt加载到该临时表中。最后通过查询临时表数据,把查询到的数据插入到最终的目标表中。</p>
<p>(1)创建临时表,非列式存储</p>
<pre><code>DROP TABLE IF EXISTS
dwd_dim_date_info_tmp;
CREATE EXTERNAL TABLE
dwd_dim_date_info_tmp(
date_id string ,
week_id string ,
week_day string ,
string ,
string ,
string ,
string ,
is_workday string ,
holiday_id string
)
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_dim_date_info_tmp/';
</code></pre>
<p>(2)将数据导入临时表</p>
<p>load data localinpath '/opt/module/db_log/date_info.txt' into table dwd_dim_date_info_tmp;</p>
<p>(3)将数据导入正式表</p>
<p>insert overwrite table dwd_dim_date_info select * from dwd_dim_date_info_tmp;</p>
<p>4)查询加载结果</p>
<p>hive (gmall)> select * from dwd_dim_date_info;</p>
<p>上面的过程就是将date——info.txt之中的数据先是导入到临时表之中,然后在使用列式存储的方法和lzo压缩将上述的东西直接再存入表中。</p>
<h2>四十二.DWD_业务_支付事实表</h2>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/0bb80df84afb423d86d1134bbd9cee32.png" /></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/1e6f63673d8a438d93ef909a7758f896.png" /></p>
<pre><code>drop table if exists dwd_fact_payment_info;
create external table dwd_fact_payment_info (
string COMMENT 'id',
out_trade_no string COMMENT '对外业务编号',
order_id string COMMENT '订单编号',
user_id string COMMENT '用户编号',
alipay_trade_no string COMMENT '支付宝交易流水编号',
payment_amount decimal(16,2) COMMENT '支付金额',
string COMMENT '交易内容',
payment_type string COMMENT '支付类型',
payment_time string COMMENT '支付时间',
province_id string COMMENT '省份ID'
) COMMENT '支付事实表表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_payment_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_payment_info partition(dt='2020-06-14')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
pi.payment_time,
oi.province_id
from
(
select * from ods_payment_info where dt='2020-06-14'
)pi
join
(
select id, province_id from ods_order_info where dt='2020-06-14'
)oi
on pi.order_id = oi.id;
</code></pre>
<p>这里的join里面的select是可以进行随便写的(在实际的企业之中,*是不可以出现的,因为会影响性能),on是对应条件。</p>
<p>3)查询结果</p>
<p>hive (gmall)> select * from dwd_fact_payment_info where dt='2020-06-14' limit 2;</p>
<h2>四十三.DWD_业务_退款事实表</h2>
<p>把ODS层ods_order_refund_info表数据导入到DWD层退款事实表,在导入过程中可以做适当的清洗。</p>
<p><strong>时间</strong></p>
<p><strong>用户</strong></p>
<p><strong>地区</strong></p>
<p><strong>商品</strong></p>
<p><strong>优惠券</strong></p>
<p><strong>活动</strong></p>
<p><strong>编码</strong></p>
<p><strong>度量值</strong></p>
<p><strong>退款</strong></p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>件数/金额</p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_fact_order_refund_info;
create external table dwd_fact_order_refund_info(
string COMMENT '编号',
user_id string COMMENT '用户ID',
order_id string COMMENT '订单ID',
sku_id string COMMENT '商品ID',
refund_type string COMMENT '退款类型',
refund_num bigint COMMENT '退款件数',
refund_amount decimal(16,2) COMMENT '退款金额',
refund_reason_type string COMMENT '退款原因类型',
create_time string COMMENT '退款时间'
) COMMENT '退款事实表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_refund_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_order_refund_info partition(dt='2020-06-14')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ods_order_refund_info
where dt='2020-06-14';
</code></pre>
<p>3)查询结果
select * from dwd_fact_order_refund_info where dt='2020-06-14' limit 2;</p>
<h2>四十四.DWD_业务_评价事实表</h2>
<p>把ODS层ods_comment_info表数据导入到DWD层评价事实表,在导入过程中可以做适当的清洗。</p>
<p><strong>时间</strong></p>
<p><strong>用户</strong></p>
<p><strong>地区</strong></p>
<p><strong>商品</strong></p>
<p><strong>优惠券</strong></p>
<p><strong>活动</strong></p>
<p><strong>编码</strong></p>
<p><strong>度量值</strong></p>
<p><strong>评价</strong></p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>个数</p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_fact_comment_info;
create external table dwd_fact_comment_info(
string COMMENT '编号',
user_id string COMMENT '用户ID',
sku_id string COMMENT '商品sku',
spu_id string COMMENT '商品spu',
order_id string COMMENT '订单ID',
string COMMENT '评价',
create_time string COMMENT '评价时间'
) COMMENT '评价事实表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_comment_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_comment_info partition(dt='2020-06-14')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ods_comment_info
where dt='2020-06-14';
</code></pre>
<p>3)查询加载结果</p>
<p>select * from dwd_fact_comment_info where dt='2020-06-14' limit 2;</p>
<h2>四十五.DWD_业务_订单详情事实表-217</h2>
<p><strong>时间</strong></p>
<p><strong>用户</strong></p>
<p><strong>地区</strong></p>
<p><strong>商品</strong></p>
<p><strong>优惠券</strong></p>
<p><strong>活动</strong></p>
<p><strong>编码</strong></p>
<p><strong>度量值</strong></p>
<p><strong>订单详情</strong></p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>件数/金额</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/3ab82788463c49178c142b7f6fd50a01.png" /></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/52e0386b697b4caf8cca293919dfee7c.png" /></p>
<p>上面需要进行考虑的是一个分摊情况,在相应的字段之中的表示</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/588ff24e30894da694fc864c27678d32.png" /></p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_fact_order_detail;
create external table dwd_fact_order_detail (
string COMMENT '订单编号',
order_id string COMMENT '订单号',
user_id string COMMENT '用户id',
sku_id string COMMENT 'sku商品id',
sku_name string COMMENT '商品名称',
order_price decimal(16,2) COMMENT '商品价格',
sku_num bigint COMMENT '商品数量',
create_time string COMMENT '创建时间',
province_id string COMMENT '省份ID',
source_type string COMMENT '来源类型',
source_id string COMMENT '来源编号',
original_amount_d decimal(20,2) COMMENT '原始价格分摊',
final_amount_d decimal(20,2) COMMENT '购买价格分摊',
feight_fee_d decimal(20,2) COMMENT '分摊运费',
benefit_reduce_amount_d decimal(20,2) COMMENT '分摊优惠'
) COMMENT '订单明细事实表表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_detail/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_order_detail partition(dt='2020-06-14')
select
id,
order_id,
user_id,
sku_id,
sku_name,
order_price,
sku_num,
create_time,
province_id,
source_type,
source_id,
original_amount_d,
if(rn=1,final_total_amount -(sum_div_final_amount - final_amount_d),final_amount_d),
if(rn=1,feight_fee - (sum_div_feight_fee - feight_fee_d),feight_fee_d),
if(rn=1,benefit_reduce_amount - (sum_div_benefit_reduce_amount -benefit_reduce_amount_d), benefit_reduce_amount_d)
from
(
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.source_type,
od.source_id,
round(od.order_price*od.sku_num,2) original_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2) final_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2) feight_fee_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2) benefit_reduce_amount_d,
row_number() over(partition by od.order_id order by od.id desc) rn,
oi.final_total_amount,
oi.feight_fee,
oi.benefit_reduce_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2)) over(partition by od.order_id) sum_div_final_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2)) over(partition by od.order_id) sum_div_feight_fee,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2)) over(partition by od.order_id) sum_div_benefit_reduce_amount
from
(
select * from ods_order_detail where dt='2020-06-14'
) od
join
(
select * from ods_order_info where dt='2020-06-14'
) oi
on od.order_id=oi.id
)t1;
</code></pre>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/71932aa136db4e78b88c5b0cd0081ee6.png" /></p>
<p>阿里面试题上来了,</p>
<p>3)查询加载结果</p>
<p>hive (gmall)> select * from dwd_fact_order_detail where dt='2020-06-14' limit 2;</p>
<h2>四十六.DWD层_业务_订单加购表</h2>
<p>由于购物车的数量是会发生变化,所以导增量不合适。</p>
<p>每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。</p>
<p>周期型快照事实表劣势:存储的数据量会比较大。</p>
<p>解决方案:周期型快照事实表存储的数据比较讲究时效性,时间太久了的意义不大,可以删除以前的数据。</p>
<p><strong>时间</strong></p>
<p><strong>用户</strong></p>
<p><strong>地区</strong></p>
<p><strong>商品</strong></p>
<p><strong>优惠券</strong></p>
<p><strong>活动</strong></p>
<p><strong>编码</strong></p>
<p><strong>度量值</strong></p>
<p><strong>加购</strong></p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>件数/金额</p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_fact_cart_info;
create external table dwd_fact_cart_info(
string COMMENT '编号',
user_id string COMMENT '用户id',
sku_id string COMMENT 'skuid',
cart_price string COMMENT '放入购物车时价格',
sku_num string COMMENT '数量',
sku_name string COMMENT 'sku名称 (冗余)',
create_time string COMMENT '创建时间',
operate_time string COMMENT '修改时间',
is_ordered string COMMENT '是否已经下单。1为已下单;0为未下单',
order_time string COMMENT '下单时间',
source_type string COMMENT '来源类型',
srouce_id string COMMENT '来源编号'
) COMMENT '加购事实表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_cart_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_cart_info partition(dt='2020-06-14')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from ods_cart_info
where dt='2020-06-14';
</code></pre>
<p>3)查询结果
select * from dwd_fact_cart_info where dt='2020-06-14' limit 2;</p>
<h2>四十七.DWD层_业务_收藏事实表</h2>
<p>收藏的标记,是否取消,会发生变化,做增量不合适。</p>
<p>每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。</p>
<p><strong>时间</strong></p>
<p><strong>用户</strong></p>
<p><strong>地区</strong></p>
<p><strong>商品</strong></p>
<p><strong>优惠券</strong></p>
<p><strong>活动</strong></p>
<p><strong>编码</strong></p>
<p><strong>度量值</strong></p>
<p><strong>收藏</strong></p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>个数</p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_fact_favor_info;
create external table dwd_fact_favor_info(
string COMMENT '编号',
user_id string COMMENT '用户id',
sku_id string COMMENT 'skuid',
spu_id string COMMENT 'spuid',
is_cancel string COMMENT '是否取消',
create_time string COMMENT '收藏时间',
cancel_time string COMMENT '取消时间'
) COMMENT '收藏事实表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_favor_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_favor_info partition(dt='2020-06-14')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ods_favor_info
where dt='2020-06-14';
</code></pre>
<p>3)查询结果查询</p>
<pre><code>select * from dwd_fact_favor_info where dt='2020-06-14' limit 2;
</code></pre>
<h2>四十八.DWD层_业务_优惠券领用事实表</h2>
<p><strong>时间</strong></p>
<p><strong>用户</strong></p>
<p><strong>地区</strong></p>
<p><strong>商品</strong></p>
<p><strong>优惠券</strong></p>
<p><strong>活动</strong></p>
<p><strong>编码</strong></p>
<p><strong>度量值</strong></p>
<p><strong>优惠券领用</strong></p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>个数</p>
<p>优惠卷的生命周期:领取优惠卷-》用优惠卷下单-》优惠卷参与支付</p>
<p>累积型快照事实表使用:统计优惠卷领取次数、优惠卷下单次数、优惠卷参与支付次数</p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_fact_coupon_use;
create external table dwd_fact_coupon_use(
string COMMENT '编号',
coupon_id string COMMENT '优惠券ID',
user_id string COMMENT 'userid',
order_id string COMMENT '订单id',
coupon_status string COMMENT '优惠券状态',
get_time string COMMENT '领取时间',
using_time string COMMENT '使用时间(下单)',
used_time string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用事实表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_coupon_use/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>注意:dt是按照优惠卷领用时间get_time做为分区。</p>
<p>2)数据装载</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/a1004c1b24594de2b214ecbda677e4a0.png" /></p>
<p>注意下面的代码之中有一句动态分区非严格模式,如果要是没有这一句的话会进行相应的报错.在非严格模式下,分区是开始不用指定的.</p>
<pre><code>set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_coupon_use partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.coupon_id is null,old.coupon_id,new.coupon_id),
if(new.user_id is null,old.user_id,new.user_id),
if(new.order_id is null,old.order_id,new.order_id),
if(new.coupon_status is null,old.coupon_status,new.coupon_status),
if(new.get_time is null,old.get_time,new.get_time),
if(new.using_time is null,old.using_time,new.using_time),
if(new.used_time is null,old.used_time,new.used_time),
date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from dwd_fact_coupon_use
where dt in
(
select
date_format(get_time,'yyyy-MM-dd')
from ods_coupon_use
where dt='2020-06-14'
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ods_coupon_use
where dt='2020-06-14'
)new
on old.id=new.id;
</code></pre>
<p>上面是使用的全连接的过程,就是有不同的地方进行补充.先是选出来未来要被覆盖的数据,全外连接相应的新产生的数据.</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/256a38525517402ea4b51a76d53aa454.png" /></p>
<p>3)查询加载结果</p>
<p>hive (gmall)> select * from dwd_fact_coupon_use where dt='2020-06-14' limit 2;</p>
<h2>四十九.DWD_业务_系统函数 (concat 、concat_ws 、collect_set 、STR_TO_MAP )</h2>
<p>1)concat函数</p>
<p>concat函数在连接字符串的时候,只要其中一个是NULL,那么将返回NULL
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/4c680b2990024c24afc51d22eff0d5c7.png" /></p>
<p>2)concat_ws函数</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/8cabb3419f704df7903b56719eb7d35b.png" /></p>
<p>3)STR_TO_MAP函数</p>
<p>(1)语法描述</p>
<p>STR_TO_MAP(VARCHAR text, VARCHAR listDelimiter, VARCHAR keyValueDelimiter)</p>
<p>(2)功能描述</p>
<p>使用listDelimiter将text分隔成K-V对,然后使用keyValueDelimiter分隔每个K-V对,组装成MAP返回。默认listDelimiter为( ,),keyValueDelimiter为(=)。</p>
<p>(3)案例</p>
<p>str_to_map('1001=2020-06-14,1002=2020-06-14',',','=')</p>
<p>输出</p>
<p>{"1001":"2020-06-14","1002":"2020-06-14"}
这个地方的含义就是使用了两刀进行了切割的过程.</p>
<h2>五十.DWD_业务_订单表分析</h2>
<p><strong>时间</strong></p>
<p><strong>用户</strong></p>
<p><strong>地区</strong></p>
<p><strong>商品</strong></p>
<p><strong>优惠券</strong></p>
<p><strong>活动</strong></p>
<p><strong>编码</strong></p>
<p><strong>度量值</strong></p>
<p><strong>订单</strong></p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>√</p>
<p>件数/金额</p>
<p>订单生命周期:创建时间=》支付时间=》取消时间=》完成时间=》退款时间=》退款完成时间。</p>
<p>由于ODS层订单表只有创建时间和操作时间两个状态,不能表达所有时间含义,所以需要关联订单状态表。订单事实表里面增加了活动id,所以需要关联活动订单表。</p>
<p>1)建表语句</p>
<pre><code>drop table if exists dwd_fact_order_info;
create external table dwd_fact_order_info (
string COMMENT '订单编号',
order_status string COMMENT '订单状态',
user_id string COMMENT '用户id',
out_trade_no string COMMENT '支付流水号',
create_time string COMMENT '创建时间(未支付状态)',
payment_time string COMMENT '支付时间(已支付状态)',
cancel_time string COMMENT '取消时间(已取消状态)',
finish_time string COMMENT '完成时间(已完成状态)',
refund_time string COMMENT '退款时间(退款中状态)',
refund_finish_time string COMMENT '退款完成时间(退款完成状态)',
province_id string COMMENT '省份ID',
activity_id string COMMENT '活动ID',
original_total_amount decimal(16,2) COMMENT '原价金额',
benefit_reduce_amount decimal(16,2) COMMENT '优惠金额',
feight_fee decimal(16,2) COMMENT '运费',
final_total_amount decimal(16,2) COMMENT '订单金额'
) COMMENT '订单事实表'
PARTITIONED BY (
string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_info/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)数据装载
<img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/41d6b26727bd42f2a1982f9e15d91869.png" /></p>
<p>3)常用函数</p>
<p>select order_id, concat(order_status,'=', operate_time) from ods_order_status_log where dt='2020-06-14'; <img alt="" src="https://img-blog.csdnimg.cn/14c94cd2537e448b989a4cd67497a279.png" /></p>
<p>select order_id, collect_set(concat(order_status,'=',operate_time)) from ods_order_status_log where dt='2020-06-14' group by order_id; //这句话的含义就是将上面的订单id是相同的数据进行一个聚合,相同的聚合到一行上面,多行转为一行.按照订单的id.<img alt="" src="https://img-blog.csdnimg.cn/05d6b16900ec4a7cae92b719bb25802d.png" /></p>
<p>select order_id, concat_ws(',', collect_set(concat(order_status,'=',operate_time))) from ods_order_status_log where dt='2020-06-14' group by order_id;//将上面的数据使用,进行分割.<img alt="" src="https://img-blog.csdnimg.cn/2fcd63a76ff74ac596728647188430d4.png" /></p>
<p>可见原来是数组的样子,现在完全是一个字符串的样子.</p>
<p>select order_id, str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))), ',' , '=') from ods_order_status_log where dt='2020-06-14' group by order_id;//我们将其变成一个map的样子,里面的状态值对应着不同的含义.<img alt="" src="https://img-blog.csdnimg.cn/9694041a2b664ab493986d16acaa78a9.png" /></p>
<p>4) 数据装载代码</p>
<pre><code>set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_order_info partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态
if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from dwd_fact_order_info
where dt
in
(
select
date_format(create_time,'yyyy-MM-dd')
from ods_order_info
where dt='2020-06-14'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
from ods_order_status_log
where dt='2020-06-14'
group by order_id
)log
join
(
select * from ods_order_info where dt='2020-06-14'
)info
on log.order_id=info.id
left join
(
select * from ods_activity_order where dt='2020-06-14'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
</code></pre>
<p>5)查询加载结果</p>
<p>hive (gmall)> select * from dwd_fact_order_info where dt='2020-06-14' limit 2;</p>
<h2>五十一.DWD_业务_用户拉链表(非常重要的)</h2>
<p>面试会问:
1.项目之中有没有用过拉链表?</p>
<p>2.用户的某一特征是变化的,但是不是经常变化的,请问你应该如何处理?(比如说是手机号为例子)</p>
<p>用户表中的数据每日既有可能新增,也有可能修改,但修改频率并不高,属于缓慢变化维度,此处采用拉链表存储用户维度数据。
<strong>1)什么是拉链表</strong></p>
<p>拉链表,记录每条信息的生命周期,一旦一条记录的生命周期结束,就是会进行重新放入一个新的记录,并把当前日期作为生效开始日期.如果当前的信息任然是有效的,在生效结束日期前填入一个极大值(9999-99-99)<img alt="" src="https://img-blog.csdnimg.cn/2921f40f4f7a454b8653885eb7fc05bd.png" /></p>
<p><strong>2) 为什么要使用拉链表?</strong></p>
<p>拉链表适用于数据缓慢变化的数据,可以大大节约使用的空间.</p>
<p><strong>3) 如何使用拉链表?</strong><img alt="" src="https://img-blog.csdnimg.cn/f9ded4d42091463988777561679a05c5.png" /></p>
<p><strong>4) 拉链表的形成过程</strong></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/d782276d0dbf4cb183d5646a3c3b40f0.png" /></p>
<p><strong>5)拉链表制作过程图</strong></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/4d0ad9ee6438483ca294269d78e0816e.png" /></p>
<p>用户当日全部数据和MySQL中每天变化的数据拼接到一起,形成一个新的临时的拉链数据表.用临时的拉链数据表去覆盖掉旧的拉链数据表.</p>
<p><strong>6)拉链表制作过程</strong></p>
<p><strong>步骤</strong> <strong>0</strong> <strong>:初始化拉链表(首次独立执行)</strong></p>
<p>(1)建立拉链表</p>
<pre><code>drop table if exists dwd_dim_user_info_his;
create external table dwd_dim_user_info_his(
string COMMENT '用户id',
string COMMENT '姓名',
string COMMENT '生日',
string COMMENT '性别',
string COMMENT '邮箱',
user_level string COMMENT '用户等级',
create_time string COMMENT '创建时间',
operate_time string COMMENT '操作时间',
start_date string COMMENT '有效开始日期',
end_date string COMMENT '有效结束日期'
) COMMENT '用户拉链表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>(2)初始化拉链表------在原来的用户表上加上两列</p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_user_info_his
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-06-14',
'9999-99-99'
from ods_user_info oi
where oi.dt='2020-06-14';
</code></pre>
<p><strong>步骤1:制作当日变动数据(包括新增,修改)每日执行</strong></p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/510d83789a46448f9bc60e6499b4b548.png" /></p>
<p>(1)如何获得每日变动表</p>
<p>a .最好表内有创建时间和变动时间(Lucky!)</p>
<p>b.如果没有,可以利用第三方工具监控比如canal(后面会说到),监控MySQL的实时变化进行记录(麻烦)。</p>
<p>c.逐行对比前后两天的数据,检查md5(concat(全部有可能变化的字段))是否相同(low)</p>
<p>d.要求业务数据库提供变动流水(人品,颜值)</p>
<p>(2)因为ods_user_info本身导入过来就是新增变动明细的表,所以不用处理</p>
<p>a)数据库中新增2020-06-15一天的数据</p>
<p>b)通过Sqoop把2020-06-15日所有数据导入</p>
<p>mysql_to_hdfs.sh all 2020-06-15</p>
<p>c)ods层数据导入</p>
<p>hdfs_to_ods_db.sh all 2020-06-15</p>
<p><strong>步骤</strong> <strong>2</strong> <strong>:先合并变动信息,再追加新增信息,插入到临时表中</strong></p>
<p>1)建立临时表</p>
<pre><code>drop table if exists dwd_dim_user_info_his_tmp;
create external table dwd_dim_user_info_his_tmp(
string COMMENT '用户id',
string COMMENT '姓名',
string COMMENT '生日',
string COMMENT '性别',
string COMMENT '邮箱',
user_level string COMMENT '用户等级',
create_time string COMMENT '创建时间',
operate_time string COMMENT '操作时间',
start_date string COMMENT '有效开始日期',
end_date string COMMENT '有效结束日期'
) COMMENT '订单拉链临时表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="lzo");
</code></pre>
<p>2)导入脚本</p>
<p><img alt="大数据项目 --- 电商数仓(一)" src="https://johngo-pic.oss-cn-beijing.aliyuncs.com/articles/20230809/74cec14618564792aa1be8241057c3e3.png" /></p>
<pre><code>SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_user_info_his_tmp
select * from
(
select --获取的是新增和变化的数据
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-06-15' start_date,--加上两列
'9999-99-99' end_date
from ods_user_info where dt='2020-06-15'
union all --union代表的是进行去重,union进行拼接
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date
from dwd_dim_user_info_his uh left join
(
select
*
from ods_user_info
where dt='2020-06-15'
) ui on uh.id=ui.id
)his
order by his.id, start_date;
</code></pre>
<p><strong>新的知识点-------------------------横向拼接用join,纵向拼接用union all</strong></p>
<p><strong>步骤3:把临时表覆盖给拉链表</strong></p>
<p>1)导入数据</p>
<p>insert overwrite table dwd_dim_user_info_his select * from dwd_dim_user_info_his_tmp;</p>
<p>2)查询导入数据</p>
<p>select id, start_date, end_date from dwd_dim_user_info_his limit 2;</p>
<h2>五十二.DWD_业务导入脚本</h2>
<p>写脚本的五个步骤:
①#!/bin/bash
②定义变量
③获取时间
④sql语句 sql=" "遇到表, ${APP}/$do_date 遇到时间, 自定义函数需要加上${APP}
⑤执行sql(执行索引)</p>
<p><strong>1.编写脚本</strong></p>
<p>1)在/home/atguigu/bin目录下创建脚本ods_to_dwd_db.sh
vim ods_to_dwd_db.sh</p>
<p>在脚本中填写如下内容:
地区表只导入一次,时间表手动上传一次,用户拉链表:临时表 初始化拉链表(只能第一次手动导入,不能够在脚本里面第一次导入)</p>
<pre><code>#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=
-d "-1 day" +%F`
fi
sql1="
set mapreduce.job.queuename=hive;
set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_dim_sku_info partition(dt=’$do_date’)
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
sku.create_time
from
(
select * from ${APP}.ods_sku_info where dt=’$do_date’
)sku
join
(
select * from ${APP}.ods_base_trademark where dt=’$do_date’
)ob on sku.tm_id=ob.tm_id
join
(
select * from ${APP}.ods_spu_info where dt=’$do_date’
)spu on spu.id = sku.spu_id
join
(
select * from ${APP}.ods_base_category3 where dt=’$do_date’
)c3 on sku.category3_id=c3.id
join
(
select * from ${APP}.ods_base_category2 where dt=’$do_date’
)c2 on c3.category2_id=c2.id
join
(
select * from ${APP}.ods_base_category1 where dt=’$do_date’
)c1 on c2.category1_id=c1.id;
insert overwrite table ${APP}.dwd_dim_coupon_info partition(dt=’$do_date’)
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt=’$do_date’;
insert overwrite table ${APP}.dwd_dim_activity_info partition(dt=’$do_date’)
select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt=’$do_date’;
insert overwrite table ${APP}.dwd_fact_order_detail partition(dt=’$do_date’)
select
id,
order_id,
user_id,
sku_id,
sku_num,
order_price,
sku_num,
create_time,
province_id,
source_type,
source_id,
original_amount_d,
if(rn=1,final_total_amount-(sum_div_final_amount-final_amount_d),final_amount_d),
if(rn=1,feight_fee-(sum_div_feight_fee-feight_fee_d),feight_fee_d),
if(rn=1,benefit_reduce_amount-(sum_div_benefit_reduce_amount-benefit_reduce_amount_d),benefit_reduce_amount_d)
from
(
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.source_type,
od.source_id,
round(od.order_price*od.sku_num,2) original_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2) final_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2) feight_fee_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2) benefit_reduce_amount_d,
row_number() over(partition by od.order_id order by od.id desc) rn,
oi.final_total_amount,
oi.feight_fee,
oi.benefit_reduce_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2)) over(partition by od.order_id) sum_div_final_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2)) over(partition by od.order_id) sum_div_feight_fee,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2)) over(partition by od.order_id) sum_div_benefit_reduce_amount
from
(
select * from ${APP}.ods_order_detail where dt=’$do_date’
) od
join
(
select * from ${APP}.ods_order_info where dt=’$do_date’
) oi
on od.order_id=oi.id
)t1;
insert overwrite table ${APP}.dwd_fact_payment_info partition(dt=’$do_date’)
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
pi.payment_time,
oi.province_id
from
(
select * from ${APP}.ods_payment_info where dt=’$do_date’
)pi
join
(
select id, province_id from ${APP}.ods_order_info where dt=’$do_date’
)oi
on pi.order_id = oi.id;
insert overwrite table ${APP}.dwd_fact_order_refund_info partition(dt=’$do_date’)
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ${APP}.ods_order_refund_info
where dt=’$do_date’;
insert overwrite table ${APP}.dwd_fact_comment_info partition(dt=’$do_date’)
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ${APP}.ods_comment_info
where dt=’$do_date’;
insert overwrite table ${APP}.dwd_fact_cart_info partition(dt=’$do_date’)
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from ${APP}.ods_cart_info
where dt=’$do_date’;
insert overwrite table ${APP}.dwd_fact_favor_info partition(dt=’$do_date’)
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ${APP}.ods_favor_info
where dt=’$do_date’;
insert overwrite table ${APP}.dwd_fact_coupon_use partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.coupon_id is null,old.coupon_id,new.coupon_id),
if(new.user_id is null,old.user_id,new.user_id),
if(new.order_id is null,old.order_id,new.order_id),
if(new.coupon_status is null,old.coupon_status,new.coupon_status),
if(new.get_time is null,old.get_time,new.get_time),
if(new.using_time is null,old.using_time,new.using_time),
if(new.used_time is null,old.used_time,new.used_time),
date_format(if(new.get_time is null,old.get_time,new.get_time),’yyyy-MM-dd’)
from
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.dwd_fact_coupon_use
where dt in
(
select
date_format(get_time,’yyyy-MM-dd’)
from ${APP}.ods_coupon_use
where dt=’$do_date’
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.ods_coupon_use
where dt=’$do_date’
)new
on old.id=new.id;
insert overwrite table ${APP}.dwd_fact_order_info partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms[‘1001’] is null,old.create_time,new.tms[‘1001’]),–1001对应未支付状态
if(new.tms[‘1002’] is null,old.payment_time,new.tms[‘1002’]),
if(new.tms[‘1003’] is null,old.cancel_time,new.tms[‘1003’]),
if(new.tms[‘1004’] is null,old.finish_time,new.tms[‘1004’]),
if(new.tms[‘1005’] is null,old.refund_time,new.tms[‘1005’]),
if(new.tms[‘1006’] is null,old.refund_finish_time,new.tms[‘1006’]),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms[‘1001’] is null,old.create_time,new.tms[‘1001′]),’yyyy-MM-dd’)
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from ${APP}.dwd_fact_order_info
where dt
in
(
select
date_format(create_time,’yyyy-MM-dd’)
from ${APP}.ods_order_info
where dt=’$do_date’
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(‘,’,collect_set(concat(order_status,’=’,operate_time))),’,’,’=’) tms
from ${APP}.ods_order_status_log
where dt=’$do_date’
group by order_id
)log
join
(
select * from ${APP}.ods_order_info where dt=’$do_date’
)info
on log.order_id=info.id
left join
(
select * from ${APP}.ods_activity_order where dt=’$do_date’
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
"
sql2="
insert overwrite table ${APP}.dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br
on bp.region_id=br.id;
"
sql3="
insert overwrite table ${APP}.dwd_dim_user_info_his_tmp
select * from
(
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
‘$do_date’ start_date,
‘9999-99-99′ end_date
from ${APP}.ods_user_info where dt=’$do_date’
union all
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date=’9999-99-99′, date_add(ui.dt,-1), uh.end_date) end_date
from ${APP}.dwd_dim_user_info_his uh left join
(
select
*
from ${APP}.ods_user_info
where dt=’$do_date’
) ui on uh.id=ui.id
)his
order by his.id, start_date;
insert overwrite table ${APP}.dwd_dim_user_info_his
select * from ${APP}.dwd_dim_user_info_his_tmp;
"
case $1 in
"first"){
$hive -e "$sql1$sql2"
};;
"all"){
$hive -e "$sql1$sql3"
};;
esac
初次导入时,脚本的第一个参数应为first,线上环境不传第二个参数,自动获取前一天日期。
[atguigu@hadoop102 bin]$ ods_to_dwd_db.sh first 2020-06-14
每日定时导入,脚本的第一个参数应为all,线上环境不传第二个参数,自动获取前一天日期。
[atguigu@hadoop102 bin]$ ods_to_dwd_db.sh all 2020-06-15
五十三.DWS层_DWT层
站在维度的角度去看待事实.
五十四.DWS_DWT层术语
1)用户
用户以 设备为判断标准,在移动统计中,每个独立设备认为是一个独立用户。Android系统根据IMEI号,IOS系统根据OpenUDID来标识一个独立用户,每部手机一个用户。
2)新增用户
首次联网使用应用的用户。如果一个用户 首次打开某APP,那这个用户定义为新增用户;卸载再安装的设备,不会被算作一次新增。新增用户包括日新增用户、周新增用户、月新增用户。
3)活跃用户
打开应用的用户即为活跃用户,不考虑用户的使用情况。每天一台设备打开多次会被计为一个活跃用户。
4)周 ( 月 ) 活跃 用户(用户活跃要去重)
某个自然周(月)内启动过应用的用户,该周(月)内的多次启动只记一个活跃用户。
5)月活跃 率
月活跃用户与截止到该月累计的用户总和之间的比例。
6)沉默用户
用户仅在安装当天(次日)启动一次,后续时间无再启动行为。该指标可以反映新增用户质量和用户与APP的匹配程度。
7)版本分布
不同版本的周内各天新增用户数,活跃用户数和启动次数。利于判断APP各个版本之间的优劣和用户行为习惯。
8)本周回流用户
上周未启动过应用,本周启动了应用的用户。
9)连续n周活跃用户
连续n周,每周至少启动一次。
1 0 )忠诚用户
连续活跃5周以上的用户
1 1 )连续活跃用户
连续2周及以上活跃的用户
1 2 )近期流失用户
连续n(2
Original: https://blog.csdn.net/m0_47489229/article/details/127465566
Author: ASDWYang
Title: 大数据项目 — 电商数仓(一)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817595/
转载文章受原作者版权保护。转载请注明原作者出处!