hdfs系列 – Orc格式与mapreduce

这是系列博客,你应该从《hdfs系列 – Text格式与mapreduce》开始阅读。

ORC的全称是(Optimized Row Columnar),ORC文件格式是一种Hadoop生态圈中的列式存储格式,它的产生早在2013年初,最初产生自Apache Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。和Parquet类似,它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗,目前也被Spark SQL、Presto等查询引擎支持,但是Impala对于ORC目前没有支持,仍然使用Parquet作为主要的列式存储格式。2015年ORC项目被Apache项目基金会提升为Apache顶级项目。ORC具有以下一些优势。

简而言之,ORC是Hive的主要文件格式,它将若干行记录作为一个组,在组内按列存储并压缩,这样可以获得很高的压缩率,显著降低IO规模,同时也能高效的支持按组进行InputSplit分片,这样就可以进行Mapper并发了。

ORC文件是列存格式,首先也需要定义记录的schema(有哪些列?),然后才能按列实现文件存储,从而带来高效的I/O效率。

Maven依赖

包括Orc文件格式与Mapreduce支持:

HDFS程序

列存储文件格式的HDFS文件读写也存在明显的区别,我们先来看写入过程。

首先必须定义记录的schema,这里定义了1个struct,里面包含id和name两列,和hive表天然对应。

通过OrcFile来打开writer,传入的writerOption需要设置schema,压缩类型snappy,以及覆盖现有文件选项。

Orc将若干行记录作为一个组,组内进行列式存储与压缩,因此我们createRowBatch来创建一个组,这个组默认能够容纳固定的行数,我们采用了默认值。

因为Batch内是列式存储各个field,所以我们取batch.cols[0]是id列的存储空间,cols[1]是name列的存储空间。

我们循环写入1000行数据,实际是不断的将下一行数据的id值和name值放入到batch对应列存储空间的对应位置,并不断的增加rowBatch的size,直到当前rowBatch的行数塞满则将该batch一次性写入到Orc文件中,并清空batch继续处理下一行数据。

最终关闭writer时,Orc文件末尾会记录下所有组的偏移量等元信息,这样就可以为多个mapper并发处理不同的组提供高效定位的机制了。


读取Orc文件的代码类似:

createReader打开文件,因为Orc文件里有各种meta信息(schema,压缩算法等),所以不必像writer那样显示指定了。

通过Reader in的getSchema可以解析到Orc文件中记录的schema,并且创建一个batch容器对象。

通过RecordReader可以开始nextBatch迭代逐个得到batch,按列方式取出batch的id列和name列,然后循环整个batch的每行记录,取出2个列的值进行打印。

这里注意,一个batch内的多行数据的name列实际分别被列存为一个大的内存buffer,所以每行的name值是对应大内存buffer的start[i]开始的length[i]长度的一段字节,而vector[i]其实一直指向该列内存buffer的头部地址。

运行hdfs程序

我们发现hdfs dfs -text并不能解析orc文件,其实orc是hive带进来的一种文件编码格式,hdfs原生是没自带解码器的,所以看起来文件仍旧是乱码的,但是会发现整个文件压缩率巨高,一屏就可以把内容展示出来:

可惜现在我还没安装hive,就不演示如何用hive命令行查看orc文件内容了。

mapreduce程序

Orc和Avro开发方式非常类似,Orc给Mapper的输入K是NullWritable,V是数据行。

因为schema外层是一个struct,所以Mapper用OrcStruct类型来承接行数据,获取里面的id和name字段值。

Mapper的输出K要求用OcrKey包装,Mapper的输出V要求用OrcValue包装,当然V可以直接用LongWritable也是可以的,但是只要想在shuffle过程使用Orc类型就必须用OrcKey和OrcValue包装,具体见文档:https://orc.apache.org/docs/mapreduce.html。(看源码发现主要原因是OrcStruct没有默认构造参数,而shuffle过程中是会反射调用空构造函数来生成临时对象的)

Reducer的输出也是将OrcStruct对象放在V里,K则用NullWritable即可。


Job配置部分代码:

首先还是把行的schema做出来。

mapper部分:

  • OrcInputFormat.addInputPath:输入路径
  • setInputFormatClass:mapper输入为OrcInputFormat文件格式。
  • setMapperClass:mapper实现。
  • setMapOutputKeyClass:mapper输出K类型是OrcKey。
  • setMapOutputValueClass:mapper输出V类型是OrcValue。
  • job.getConfiguration().set:直接向job手动配置Orc任务的一些独有参数,MAPRED_SHUFFLE_KEY_SCHEMA是说mapper输出K的schema,MAPRED_SHUFFLE_VALUE_SCHEMA是mapper输出V的schema。

reducer部分:

  • OrcOutputFormat.setOutputPath:输出路径
  • OrcOutputFormat.setCompressOutput:输出压缩
  • OrcOutputFormat.setOutputCompressorClass:输出Gzip压缩
  • setOutputFormatClass:输出为Orc文件格式。
  • setReducerClass:reducer实现。
  • setOutputKeyClass:reducer输出K类型NullWritable。
  • setOutputValueClass:reducer输出V类型OrcStruct。
  • job.getConfiguration().set:设置reducer输出V的schema。

运行mapreduce

用hadoop jar命令运行,看到输出文件:

我们可以用代码来读一下这个Orc输出文件:

会打印如下内容:

关于hdfs文件格式与mapreduce的系列博客就此结束,有了这个基础再向上学习flume/hive/spark等应该就手到擒来了。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~