hive系列 – 数仓分层Demo
大数据越来越重要,要管理与利用好这些数据就需要建设数据仓库。
数仓建设是有一套建模理论的,本文通过一个简单例子来理解数仓分层的基本样貌。
建库
假设我们是天猫商城,需要建设一个数据仓库,那么我们首先hive建库:
1 |
hive> create database tmall; |
原始数据
数仓的原始数据来自外部系统,可以归纳为2类:
- 日志通常采用flume工具写入到HDFS,其记录格式一般是JSON。
- 数据库通常采用sqoop工具从mysql提取数据到HDFS,其记录格式是多列组成的行。
数仓的计算采用T+1离线计算,也就是凌晨计算昨天产生的数据。
我们的Demo模拟生成了一份2020-10-15日的日志数据共100万行:
1 2 3 4 5 6 7 8 9 10 11 |
root@ubuntu:~/bigdata/demo/dw# cat genlog.py import json import random names=['lily','john'] products=['potato','milk','apple','banana'] fd=open('./log.txt','w') for i in range(1000000): r = {'name': names[random.randint(0,1)], 'product': products[random.randint(0,3)]} fd.write(json.dumps(r)+'\n') |
磁盘上的./log.txt内容如下:
{“product”: “potato”, “name”: “lily”}
{“product”: “milk”, “name”: “lily”}
{“product”: “apple”, “name”: “john”}
{“product”: “milk”, “name”: “lily”}
{“product”: “potato”, “name”: “john”}
{“product”: “milk”, “name”: “lily”}
{“product”: “milk”, “name”: “john”}
{“product”: “banana”, “name”: “john”}
{“product”: “apple”, “name”: “lily”}
{“product”: “milk”, “name”: “lily”}
每一行代表一次购买行为,name是用户名,product是购买的商品,name和product通过随机枚举组合生成。
我们建立tmall数仓的log存储目录,并把当天日志放到目录下:
1 2 |
hdfs dfs -mkdir -p /dw/tmall/log/shopping_log/2020-10-15 hdfs dfs -put ./log.txt /dw/tmall/log/shopping_log/2020-10-15 |
(注意:实际上述log写入hdfs应该由flume完成,这里只是示例)
tmall数仓的数据全部放在/dw/tmall目录下,flume日志则保存在log子目录下,并按日志用途命名目录,按日期切分目录方便T+1加载。
创建ODS层
原始数据必须加载到Hive表里才能进一步SQL处理,容纳原始数据的Hive表就叫做ODS层,它以HIve表的形式保存原数据的样貌。
我们建立ODS表,把日志按原样加载到Hive表中:
1 2 3 4 5 6 7 8 9 10 |
create external table if not exists tmall.ods_shopping_log( `json` string comment 'json format log' ) comment 'ODS-shopping log' partitioned by (`dt` string) row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile location '/dw/tmall/ods/ods_shopping_log'; |
- 统一使用external外表,自行指定数据目录,按照规范指定到/dw/tmall/ods下面,按表名区分目录。
- 每行日志被\n切割入唯一字段json,完全不做解析,这就是ODS层要做的事情,保留原始数据到Hive。
- 数据按日期dt分区存储,数据文件格式为textfile格式,分隔符通过row format配置。
我们看一下hive建立完成后的表schema,说明几个关键概念:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
hive> show create table ods_shopping_log; OK createtab_stmt CREATE EXTERNAL TABLE `ods_shopping_log`( `json` string COMMENT 'json format log') COMMENT 'ODS-shopping log' PARTITIONED BY ( `dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'field.delim'='\t', 'line.delim'='\n', 'serialization.format'='\t') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs://192.168.2.119:9000/dw/tmall/ods/ods_shopping_log' TBLPROPERTIES ( 'bucketing_version'='2', 'transient_lastDdlTime'='1602744162') Time taken: 0.239 seconds, Fetched: 20 row(s) |
理解Hive的数据格式至关重要,我们知道Hive底层跑的mapreduce,而mapreduce则需要指定mapper的inputformat和reducer的outputformat,前者把文件解析成<K,V>,后者把<K,V>写回文件。
Hive的Stored as就是指定inputformat/outputformat的地方,因为日志都是文本格式,所以这里都是text format。
当mapper解析到1个<K,V>后,对于V部分会进一步通过SerDe进行反序列化得到每一列,因为hive是table嘛。同样的,reducer输出的<K,V>的V其实也是将多个列通过SerDe序列化后输出到HDFS的。
总结一下:
hdfs file -> input format -> <K,V> -> Mapper -> Serde -> columns -> Reducer -> Serde -> <K,V> -> output format -> hdfs file
红色部分是select时候发生的事,只需要mapper即可解析到表中每行数据的每个列的值,不需要执行reducer之后的流程。
黑色部分是insert时候发生的事,行中的columns经过Serde序列化成V之后,按照outputformat写到HDFS上。
加载原始数据到ODS
有了ODS表,我们可以凌晨把昨天的日志数据load到ODS表中:
load data inpath ‘/dw/tmall/log/shopping_log/2020-10-15′ overwrite into table tmall.ods_shopping_log partition(dt=’2020-10-15’);
注意,load命令会把原目录的数据文件mv到hive表的分区目录下,并不会执行任何的MR计算,这个过程是没有成本的。(强调:原目录的数据被挪到hive数据目录下)
因为原始日志是json文本行组成的文件,而ods表的inputformat也是textfile,因此当对ods表做select时MR自然能够解析每一行日志到json字段,理解这个过程非常重要。
我们现在可以直接查询ods表:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
hive> select * from tmall.ods_shopping_log limit 10; OK ods_shopping_log.json ods_shopping_log.dt {"product": "potato", "name": "lily"} 2020-10-15 {"product": "milk", "name": "lily"} 2020-10-15 {"product": "apple", "name": "john"} 2020-10-15 {"product": "milk", "name": "lily"} 2020-10-15 {"product": "potato", "name": "john"} 2020-10-15 {"product": "milk", "name": "lily"} 2020-10-15 {"product": "milk", "name": "john"} 2020-10-15 {"product": "banana", "name": "john"} 2020-10-15 {"product": "apple", "name": "lily"} 2020-10-15 {"product": "milk", "name": "lily"} 2020-10-15 |
我们可以看一下原始数据目录,已经为空:
root@ubuntu:~/bigdata/demo/dw# hdfs dfs -ls /dw/tmall/log/shopping_log/
什么也没有..
然后看一下ods表的数据目录:
1 2 3 |
root@ubuntu:~/bigdata/demo/dw# hdfs dfs -ls /dw/tmall/ods/ods_shopping_log/dt=2020-10-15/ Found 1 items -rw-r--r-- 1 root supergroup 37249432 2020-10-16 08:34 /dw/tmall/ods/ods_shopping_log/dt=2020-10-15/log.txt |
hive表将分区数据按目录进行组织,log.txt已经被挪了进来,这个过程没有发生任何MR计算。
创建DWD层
dwd层是对ods层清洗后的结果,是所有后续数据分析的基石,需要保证数据质量。
对于从日志而来的ODS表,我们最基本要做的就是把日志里的JSON字段提取到dwd表的列中。
建表:
1 2 3 4 5 6 7 8 9 |
create external table if not exists tmall.dwd_shopping_log( `name` string comment 'buyer name', `product` string comment 'product name' ) comment 'DWD-shopping log' partitioned by (`dt` string) stored as parquet location '/dw/tmall/dwd/dwd_shopping_log' tblproperties("parquet.compression"="SNAPPY"); |
继续使用外表,为json的2个字段创建对应的2个列,按日期分区,将数据指定到/dw/tmall/dwd目录下。
注意,除了ODS层由于HDFS文件格式文件受限,数仓后续层都应该采用parquet或者orc这样列存储文件格式并开启snappy压缩,可以带来更高的磁盘空间利用率。
清洗ODS,写入DWD
这里我们会用insert select的语法,这是Hive最为常用的语法,用来将查询结果写入到另外一张表里:
1 2 3 4 5 6 |
insert overwrite table tmall.dwd_shopping_log partition(`dt`='2020-10-15') select get_json_object(`json`, '$.name') name, get_json_object(`json`,'$.product') product from tmall.ods_shopping_log where `dt`='2020-10-15'; |
从ods_shopping_log表当天dt分区的日志,提供get_json_object提取里面的字段作为列,最后insert overwrite灌入到dwd_shopping_log表的对应dt分区下。
这个SQL实际select子句先使用ods表的text input format执行mapper得到json列,通过get_json_object提取出2列后,再通过insert overwrite子句的reducer执行dwd表的parquet output format写入到dwd表的hdfs路径下。
在DWD层根据需求,可能涉及当日增量数据和历史全量数据如何合并的需求,也可能涉及多个dwd表互相JOIN来冗余字段的需求,这个例子仅仅单纯的将ODS当天增量日志解析出来,没有涉及更加复杂的需求。
查询一下dwd表:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
hive> select * from tmall.dwd_shopping_log where dt='2020-10-15' limit 10; OK dwd_shopping_log.name dwd_shopping_log.product dwd_shopping_log.dt lily potato 2020-10-15 lily milk 2020-10-15 john apple 2020-10-15 lily milk 2020-10-15 john potato 2020-10-15 lily milk 2020-10-15 john milk 2020-10-15 john banana 2020-10-15 lily apple 2020-10-15 lily milk 2020-10-15 |
创建DWS层
dwd层是按日期分区的,dws层则是对当天数据做一个天级的统计。
我们接下来会统计当天日志中:
- 每个用户购买每种商品多少次。
- 每种商品一共被购买了几次。
因此,我定义表结构如下:
1 2 3 4 5 6 7 8 9 10 11 |
create external table if not exists dws_shopping_basic_stats( `name` string comment 'buyer name', `product` string comment 'product name', `buy_times` bigint comment 'buy times', `buyed_times` bigint comment 'buyed times' ) comment 'DWS-shopping basic stats' partitioned by (`dt` string) stored as parquet location '/dw/tmall/dws/dws_shopping_log' tblproperties('parquet.compression="SNAPPY"'); |
哪个用户,哪个商品,购买了几次,该商品被多少人购买过。
可见,buyed_time属于商品的统计,但被冗余到了每个用户的记录下面。
统计DWS,写入DWS
DWD存了当日的用户行为数据,我们直接对其进行统计即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
with tmp_user_product_count as ( select name,product,count(*) buy_times from dwd_shopping_log where `dt`='2020-10-15' group by name,product ), tmp_product_count as ( select product,count(*) buyed_times from dwd_shopping_log where `dt`='2020-10-15' group by product ) insert overwrite table tmall.dws_shopping_basic_stats partition(`dt`='2020-10-15') select upc.name,upc.product,upc.buy_times,pc.buyed_times from tmp_user_product_count upc join tmp_product_count pc on upc.product=pc.product; |
with子句可以用来生成临时表,在后面的select中反复使用的临时表可以利用with预生成。
- tmp_user_product_count统计用户购买商品的次数。
- tmp_product_count统计商品被购买次数。
- 最后让tmp_user_product_count的product去join tmp_product_count表,为每行附加冗余列。
查看数据:
1 2 3 4 5 6 7 8 9 10 11 12 |
hive> select * from dws_shopping_basic_stats limit 10; OK dws_shopping_basic_stats.name dws_shopping_basic_stats.product dws_shopping_basic_stats.buy_times dws_shopping_basic_stats.buyed_times dws_shopping_basic_stats.dt john apple 124885 249794 2020-10-15 lily apple 124909 249794 2020-10-15 john banana 124495 249078 2020-10-15 lily banana 124583 249078 2020-10-15 john milk 125087 250387 2020-10-15 lily milk 125300 250387 2020-10-15 john potato 125185 250741 2020-10-15 lily potato 125556 250741 2020-10-15 Time taken: 0.257 seconds, Fetched: 8 row(s) |
dws表对当日数据进行轻度统计,得到:用户名,商品名,用户购买次数,商品被购买次数。
创建DWT表
dwt基于dws的每日统计数据进行汇总统计,提供更加意义的数据。
dw_shopping_history表保存每个用户对每个商品在当天的购买次数,历史总购买次数,近30天的购买次数,这样的汇总表可以直观的看到用户当天、近期、历史的行为。
1 2 3 4 5 6 7 8 9 10 11 12 |
create external table if not exists dwt_shopping_history( `name` string comment 'buyer name', `product` string comment 'product name', `today_buy_times` bigint comment 'today buy times', `total_buy_times` bigint comment 'total buy times', `30d_buy_times` bigint comment 'buy times for last 30days' ) comment 'DWT-shopping history' partitioned by (`dt` string) stored as parquet location '/dw/tmall/dwt/dwt_shopping_history' tblproperties('parquet.compression="SNAPPY"'); |
统计DWS,写入DWT
每天的汇总数据写入dwt当天分区下:
- today_buy_times:当天购买次数,直接取dws当天分区的统计结果。
- total_buy_times:累计购买次数,取dwt前一天分区的total_buy_times再加today_buy_times即可。
- 30d_buy_times:30天内累计购买次数,直接基于dws统计近30天时间分区的数据。
dwt表的某个时间分区是对截止当天为止历史数据的统计,所以接下来1天的数据会基于前一天的dwt数据继续做增量家和,这是dwt层的基本统计逻辑。
因此就涉及到全量数据和增量数据做full outer join合并的过程,需要把增量部分更新或者插入到全量数据中去,这个SQL会这样写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
insert overwrite table tmall.dwt_shopping_history partition(`dt`='2020-10-15') select nvl(old.name,new.name) name, nvl(old.product,new.product) product, nvl(new.today_buy_times, 0) today_buy_times, nvl(old.total_buy_times,0) + nvl(new.today_buy_times,0) total_buy_times, nvl(new.30d_buy_times, 0) 30d_buy_times from ( select name, product, total_buy_times from dwt_shopping_history where `dt`='2020-10-14' ) old full outer join ( select name, product, sum(if(`dt`='2020-10-15',buy_times,0)) today_buy_times, sum(buy_times) 30d_buy_times from dws_shopping_basic_stats where `dt`>=date_add('2020-10-15', -30) group by name,product ) new on old.name=new.name and old.product=new.product; |
old临时表是前一天的dwt表数据,new临时表为当日购买次数和当日之前30天累计购买次数。
通过full outer join可以如上面的图片所示,有一些记录全量表有而增量表没有,有一些增量表有而全量表没有,有一些记录两边都有。
- 全量没有,增量有,则实际是插入。
- 全量有,增量没有,则实际保留全量记录。
- 全量有,增量有,则增量覆盖全量部分列。
nvl(a,b)的作用是如果a不是null则返回a,否则返回b。
我们知道full outer join可能导致左侧表miss或者右侧表miss,并且miss的一侧列都是null,因此可以用来区分上述3种情况,利用nvl方法可以很容易的覆盖上述3个case。
- nvl(new.today_buy_times, 0) today_buy_times:当日购买次数,如果增量有当然用增量的,否则说明当日没人购买,所以记为0。
- nvl(old.total_buy_times,0) + nvl(new.today_buy_times,0) total_buy_times:昨日的全量次数,加上今天的购买次数,如果今天没购买就加0次。
- nvl(new.30d_buy_times, 0) 30d_buy_times:近30天购买次数,如果增量表没有统计到则说明近30日已没有购买记录,所以为0,否则就应该用最新30天的计数覆盖旧记录。
最后我们看一下结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
hive> > > select * from dwt_shopping_history; OK dwt_shopping_history.name dwt_shopping_history.product dwt_shopping_history.today_buy_times dwt_shopping_history.total_buy_times dwt_shopping_history.30d_buy_times dwt_shopping_history.dt john apple 124885 124885 124885 2020-10-15 john banana 124495 124495 124495 2020-10-15 john milk 125087 125087 125087 2020-10-15 john potato 125185 125185 125185 2020-10-15 lily apple 124909 124909 124909 2020-10-15 lily banana 124583 124583 124583 2020-10-15 lily milk 125300 125300 125300 2020-10-15 lily potato 125556 125556 125556 2020-10-15 Time taken: 0.17 seconds, Fetched: 8 row(s) |
这样T+1的运行,每天都会得到一个新的报表,记录了当天、最近30天、历史总计的数据。
ADS层
这层主要为了BI报表或者业务服务(比如推荐特征),说白了就跑各种数的,数据可以来自于dwt和dws两层,如果现有dwt和dws不满足需求,应该需要从dwd层开始出新的dws来满足需求。
因为过于场景化,这里没做演示。
数仓将外部数据纳入Hive数据库,通过分层设计,提升数据复用性,规划数据管理,利用SQL满足大部分统计/报表需求,希望本文的简单例子可以帮助大家理解数仓的基本原理。
具体Hive语法建议读一下官方手册:
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

1