hive系列 – 数仓分层Demo

大数据越来越重要,要管理与利用好这些数据就需要建设数据仓库。

数仓建设是有一套建模理论的,本文通过一个简单例子来理解数仓分层的基本样貌。

建库

假设我们是天猫商城,需要建设一个数据仓库,那么我们首先hive建库:

原始数据

数仓的原始数据来自外部系统,可以归纳为2类:

  • 日志通常采用flume工具写入到HDFS,其记录格式一般是JSON。
  • 数据库通常采用sqoop工具从mysql提取数据到HDFS,其记录格式是多列组成的行。

数仓的计算采用T+1离线计算,也就是凌晨计算昨天产生的数据。

我们的Demo模拟生成了一份2020-10-15日的日志数据共100万行:

磁盘上的./log.txt内容如下:

{“product”: “potato”, “name”: “lily”}
{“product”: “milk”, “name”: “lily”}
{“product”: “apple”, “name”: “john”}
{“product”: “milk”, “name”: “lily”}
{“product”: “potato”, “name”: “john”}
{“product”: “milk”, “name”: “lily”}
{“product”: “milk”, “name”: “john”}
{“product”: “banana”, “name”: “john”}
{“product”: “apple”, “name”: “lily”}
{“product”: “milk”, “name”: “lily”}

每一行代表一次购买行为,name是用户名,product是购买的商品,name和product通过随机枚举组合生成。

我们建立tmall数仓的log存储目录,并把当天日志放到目录下:

(注意:实际上述log写入hdfs应该由flume完成,这里只是示例)

tmall数仓的数据全部放在/dw/tmall目录下,flume日志则保存在log子目录下,并按日志用途命名目录,按日期切分目录方便T+1加载。

创建ODS层

原始数据必须加载到Hive表里才能进一步SQL处理,容纳原始数据的Hive表就叫做ODS层,它以HIve表的形式保存原数据的样貌。

我们建立ODS表,把日志按原样加载到Hive表中:

  • 统一使用external外表,自行指定数据目录,按照规范指定到/dw/tmall/ods下面,按表名区分目录。
  • 每行日志被\n切割入唯一字段json,完全不做解析,这就是ODS层要做的事情,保留原始数据到Hive。
  • 数据按日期dt分区存储,数据文件格式为textfile格式,分隔符通过row format配置。

我们看一下hive建立完成后的表schema,说明几个关键概念:

理解Hive的数据格式至关重要,我们知道Hive底层跑的mapreduce,而mapreduce则需要指定mapper的inputformat和reducer的outputformat,前者把文件解析成<K,V>,后者把<K,V>写回文件。

Hive的Stored as就是指定inputformat/outputformat的地方,因为日志都是文本格式,所以这里都是text format。

当mapper解析到1个<K,V>后,对于V部分会进一步通过SerDe进行反序列化得到每一列,因为hive是table嘛。同样的,reducer输出的<K,V>的V其实也是将多个列通过SerDe序列化后输出到HDFS的。

总结一下:

hdfs file -> input format -> <K,V> -> Mapper -> Serde -> columns -> Reducer -> Serde -> <K,V> -> output format -> hdfs file

红色部分是select时候发生的事,只需要mapper即可解析到表中每行数据的每个列的值,不需要执行reducer之后的流程。

黑色部分是insert时候发生的事,行中的columns经过Serde序列化成V之后,按照outputformat写到HDFS上。

加载原始数据到ODS

有了ODS表,我们可以凌晨把昨天的日志数据load到ODS表中:

load data inpath ‘/dw/tmall/log/shopping_log/2020-10-15′ overwrite into table tmall.ods_shopping_log partition(dt=’2020-10-15’);

注意,load命令会把原目录的数据文件mv到hive表的分区目录下,并不会执行任何的MR计算,这个过程是没有成本的。(强调:原目录的数据被挪到hive数据目录下)

因为原始日志是json文本行组成的文件,而ods表的inputformat也是textfile,因此当对ods表做select时MR自然能够解析每一行日志到json字段,理解这个过程非常重要。

我们现在可以直接查询ods表:

我们可以看一下原始数据目录,已经为空:

root@ubuntu:~/bigdata/demo/dw# hdfs dfs -ls /dw/tmall/log/shopping_log/

什么也没有..

然后看一下ods表的数据目录:

hive表将分区数据按目录进行组织,log.txt已经被挪了进来,这个过程没有发生任何MR计算。

创建DWD层

dwd层是对ods层清洗后的结果,是所有后续数据分析的基石,需要保证数据质量。

对于从日志而来的ODS表,我们最基本要做的就是把日志里的JSON字段提取到dwd表的列中。

建表:

继续使用外表,为json的2个字段创建对应的2个列,按日期分区,将数据指定到/dw/tmall/dwd目录下。

注意,除了ODS层由于HDFS文件格式文件受限,数仓后续层都应该采用parquet或者orc这样列存储文件格式并开启snappy压缩,可以带来更高的磁盘空间利用率。

清洗ODS,写入DWD

这里我们会用insert select的语法,这是Hive最为常用的语法,用来将查询结果写入到另外一张表里:

从ods_shopping_log表当天dt分区的日志,提供get_json_object提取里面的字段作为列,最后insert overwrite灌入到dwd_shopping_log表的对应dt分区下。

这个SQL实际select子句先使用ods表的text input format执行mapper得到json列,通过get_json_object提取出2列后,再通过insert overwrite子句的reducer执行dwd表的parquet output format写入到dwd表的hdfs路径下。

在DWD层根据需求,可能涉及当日增量数据和历史全量数据如何合并的需求,也可能涉及多个dwd表互相JOIN来冗余字段的需求,这个例子仅仅单纯的将ODS当天增量日志解析出来,没有涉及更加复杂的需求。

查询一下dwd表:

创建DWS层

dwd层是按日期分区的,dws层则是对当天数据做一个天级的统计。

我们接下来会统计当天日志中:

  • 每个用户购买每种商品多少次。
  • 每种商品一共被购买了几次。

因此,我定义表结构如下:

哪个用户,哪个商品,购买了几次,该商品被多少人购买过。

可见,buyed_time属于商品的统计,但被冗余到了每个用户的记录下面。

统计DWS,写入DWS

DWD存了当日的用户行为数据,我们直接对其进行统计即可:

with子句可以用来生成临时表,在后面的select中反复使用的临时表可以利用with预生成。

  • tmp_user_product_count统计用户购买商品的次数。
  • tmp_product_count统计商品被购买次数。
  • 最后让tmp_user_product_count的product去join tmp_product_count表,为每行附加冗余列。

查看数据:

dws表对当日数据进行轻度统计,得到:用户名,商品名,用户购买次数,商品被购买次数。

创建DWT表

dwt基于dws的每日统计数据进行汇总统计,提供更加意义的数据。

dw_shopping_history表保存每个用户对每个商品在当天的购买次数,历史总购买次数,近30天的购买次数,这样的汇总表可以直观的看到用户当天、近期、历史的行为。

统计DWS,写入DWT

每天的汇总数据写入dwt当天分区下:

  • today_buy_times:当天购买次数,直接取dws当天分区的统计结果。
  • total_buy_times:累计购买次数,取dwt前一天分区的total_buy_times再加today_buy_times即可。
  • 30d_buy_times:30天内累计购买次数,直接基于dws统计近30天时间分区的数据。

dwt表的某个时间分区是对截止当天为止历史数据的统计,所以接下来1天的数据会基于前一天的dwt数据继续做增量家和,这是dwt层的基本统计逻辑。

因此就涉及到全量数据和增量数据做full outer join合并的过程,需要把增量部分更新或者插入到全量数据中去,这个SQL会这样写:

old临时表是前一天的dwt表数据,new临时表为当日购买次数和当日之前30天累计购买次数。

通过full outer join可以如上面的图片所示,有一些记录全量表有而增量表没有,有一些增量表有而全量表没有,有一些记录两边都有。

  • 全量没有,增量有,则实际是插入。
  • 全量有,增量没有,则实际保留全量记录。
  • 全量有,增量有,则增量覆盖全量部分列。

nvl(a,b)的作用是如果a不是null则返回a,否则返回b。

我们知道full outer join可能导致左侧表miss或者右侧表miss,并且miss的一侧列都是null,因此可以用来区分上述3种情况,利用nvl方法可以很容易的覆盖上述3个case。

  • nvl(new.today_buy_times, 0) today_buy_times:当日购买次数,如果增量有当然用增量的,否则说明当日没人购买,所以记为0。
  • nvl(old.total_buy_times,0) + nvl(new.today_buy_times,0) total_buy_times:昨日的全量次数,加上今天的购买次数,如果今天没购买就加0次。
  • nvl(new.30d_buy_times, 0) 30d_buy_times:近30天购买次数,如果增量表没有统计到则说明近30日已没有购买记录,所以为0,否则就应该用最新30天的计数覆盖旧记录。

最后我们看一下结果:

这样T+1的运行,每天都会得到一个新的报表,记录了当天、最近30天、历史总计的数据。

ADS层

这层主要为了BI报表或者业务服务(比如推荐特征),说白了就跑各种数的,数据可以来自于dwt和dws两层,如果现有dwt和dws不满足需求,应该需要从dwd层开始出新的dws来满足需求。

因为过于场景化,这里没做演示。

数仓将外部数据纳入Hive数据库,通过分层设计,提升数据复用性,规划数据管理,利用SQL满足大部分统计/报表需求,希望本文的简单例子可以帮助大家理解数仓的基本原理。

具体Hive语法建议读一下官方手册:

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~