这是系列博客,你应该从《hdfs系列 – Text格式与mapreduce》开始阅读。
ORC的全称是(Optimized Row Columnar),ORC文件格式是一种Hadoop生态圈中的列式存储格式,它的产生早在2013年初,最初产生自Apache Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。和Parquet类似,它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗,目前也被Spark SQL、Presto等查询引擎支持,但是Impala对于ORC目前没有支持,仍然使用Parquet作为主要的列式存储格式。2015年ORC项目被Apache项目基金会提升为Apache顶级项目。ORC具有以下一些优势。
简而言之,ORC是Hive的主要文件格式,它将若干行记录作为一个组,在组内按列存储并压缩,这样可以获得很高的压缩率,显著降低IO规模,同时也能高效的支持按组进行InputSplit分片,这样就可以进行Mapper并发了。
ORC文件是列存格式,首先也需要定义记录的schema(有哪些列?),然后才能按列实现文件存储,从而带来高效的I/O效率。
Maven依赖
包括Orc文件格式与Mapreduce支持:
1 2 3 4 5 6 7 8 9 10 11 12 |
<!-- https://mvnrepository.com/artifact/org.apache.orc/orc-core --> <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> <version>1.6.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.orc/orc-mapreduce --> <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-mapreduce</artifactId> <version>1.6.4</version> </dependency> |
HDFS程序
列存储文件格式的HDFS文件读写也存在明显的区别,我们先来看写入过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package cc.yuerblog.fs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.*; import java.io.IOException; // ORC格式介绍:https://www.wangt.cc/2019/10/%E9%98%85%E8%AF%BBhive-orc-%E6%96%87%E4%BB%B6-%E5%AE%98%E6%96%B9%E6%96%87%E6%A1%A3/ public class OrcFileTest { public void run(Configuration conf) throws IOException { // 数据结构描述 TypeDescription schema = TypeDescription.createStruct(); schema.addField("id", TypeDescription.createInt()); schema.addField("name", TypeDescription.createString()); // 文件路径 Path path = new Path("/orc.txt"); // 写打开 Writer out = OrcFile.createWriter(path, OrcFile.writerOptions(conf). setSchema(schema).compress(CompressionKind.SNAPPY).overwrite(true)); // 写入1000行 VectorizedRowBatch rowBatch = schema.createRowBatch(); // 每个batch内的N行按列存储 LongColumnVector idColumn = (LongColumnVector)rowBatch.cols[0]; // id 列 BytesColumnVector nameColumn = (BytesColumnVector)rowBatch.cols[1]; // name列 for (int i = 0; i < 1000; i++) { idColumn.vector[rowBatch.size] = i; String name = String.format("name-%d", i); nameColumn.setVal(rowBatch.size, name.getBytes()); if (++rowBatch.size == rowBatch.getMaxSize()) { out.addRowBatch(rowBatch); rowBatch.reset(); } } if (rowBatch.size != 0) { out.addRowBatch(rowBatch); rowBatch.reset(); } out.close(); |
首先必须定义记录的schema,这里定义了1个struct,里面包含id和name两列,和hive表天然对应。
通过OrcFile来打开writer,传入的writerOption需要设置schema,压缩类型snappy,以及覆盖现有文件选项。
Orc将若干行记录作为一个组,组内进行列式存储与压缩,因此我们createRowBatch来创建一个组,这个组默认能够容纳固定的行数,我们采用了默认值。
因为Batch内是列式存储各个field,所以我们取batch.cols[0]是id列的存储空间,cols[1]是name列的存储空间。
我们循环写入1000行数据,实际是不断的将下一行数据的id值和name值放入到batch对应列存储空间的对应位置,并不断的增加rowBatch的size,直到当前rowBatch的行数塞满则将该batch一次性写入到Orc文件中,并清空batch继续处理下一行数据。
最终关闭writer时,Orc文件末尾会记录下所有组的偏移量等元信息,这样就可以为多个mapper并发处理不同的组提供高效定位的机制了。
读取Orc文件的代码类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// 读打开 Reader in = OrcFile.createReader(path, OrcFile.readerOptions(conf)); // 解析schema VectorizedRowBatch inBatch = in.getSchema().createRowBatch(); // 流解析文件 RecordReader rows = in.rows(); while (rows.nextBatch(inBatch)) { // 读1个batch // 列式读取 LongColumnVector idCol = (LongColumnVector)inBatch.cols[0]; // id 列 BytesColumnVector nameCol = (BytesColumnVector)inBatch.cols[1]; // name列 for (int i = 0; i < inBatch.size; i++) { // 注意:因为是列存储,所以name列是一个大buffer存储的,需要从里面的start偏移量取length长度的才是该行的列值 System.out.printf("[Orc] id=%d name=%s\n", idCol.vector[i], new String(nameCol.vector[i], nameCol.start[i], nameCol.length[i])); } } rows.close(); |
createReader打开文件,因为Orc文件里有各种meta信息(schema,压缩算法等),所以不必像writer那样显示指定了。
通过Reader in的getSchema可以解析到Orc文件中记录的schema,并且创建一个batch容器对象。
通过RecordReader可以开始nextBatch迭代逐个得到batch,按列方式取出batch的id列和name列,然后循环整个batch的每行记录,取出2个列的值进行打印。
这里注意,一个batch内的多行数据的name列实际分别被列存为一个大的内存buffer,所以每行的name值是对应大内存buffer的start[i]开始的length[i]长度的一段字节,而vector[i]其实一直指向该列内存buffer的头部地址。
运行hdfs程序
我们发现hdfs dfs -text并不能解析orc文件,其实orc是hive带进来的一种文件编码格式,hdfs原生是没自带解码器的,所以看起来文件仍旧是乱码的,但是会发现整个文件压缩率巨高,一屏就可以把内容展示出来:
1 2 3 4 5 6 7 8 9 10 |
root@ubuntu:~# hdfs dfs -text /orc.txt ORC �P3 � �?��<PO % �" =ame-nam>1999?�{?1����=n@1e-0123A167891 B1 C1 D1 E1 F2 F2 F2 F2 F2 F2 F2 F2 F2 F2 F3 F3 F3 F3 F3 F3 F3 F3F3 F3 F4 F4 F4 F4 F4 F4 F4 F4 F4 F4 F5 F5 F5 F5 F5 F5 F5 F5 F5 F5 F6 F6 F6 F6 F6F6 F6 F6 F6 F6 F7 F7 F7 F7 F7 F7 F7 F7 F7 F7 F8 F8 F8 F8 F8 F8 F8 F8 F8 F8 F9 F9�5-�5-�M�1M�1M�1M�1M�1M�1M�1m1m1m1m1m1m1m1m11mM�1m�1M�1M�153 F10 G10 H10 I10 J10 K10 L10 M10 N10 O10 1m |
可惜现在我还没安装hive,就不演示如何用hive命令行查看orc文件内容了。
mapreduce程序
Orc和Avro开发方式非常类似,Orc给Mapper的输入K是NullWritable,V是数据行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private static class Mapper extends org.apache.hadoop.mapreduce.Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> { @Override protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException { IntWritable id = (IntWritable)value.getFieldValue("id"); Text name = (Text)value.getFieldValue("name"); System.out.printf("%d -> %s", id.get(), name.toString()); context.write(new OrcKey(value), new OrcValue(new LongWritable(1))); // struct -> 1 } } private static class Reducer extends org.apache.hadoop.mapreduce.Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> { @Override protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context) throws IOException, InterruptedException { context.write(NullWritable.get(), (OrcStruct)key.key); // 原样输出orc对象 } } |
因为schema外层是一个struct,所以Mapper用OrcStruct类型来承接行数据,获取里面的id和name字段值。
Mapper的输出K要求用OcrKey包装,Mapper的输出V要求用OrcValue包装,当然V可以直接用LongWritable也是可以的,但是只要想在shuffle过程使用Orc类型就必须用OrcKey和OrcValue包装,具体见文档:https://orc.apache.org/docs/mapreduce.html。(看源码发现主要原因是OrcStruct没有默认构造参数,而shuffle过程中是会反射调用空构造函数来生成临时对象的)
Reducer的输出也是将OrcStruct对象放在V里,K则用NullWritable即可。
Job配置部分代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
// https://orc.apache.org/docs/mapreduce.html public class OrcMR { private static TypeDescription schema; static { // 数据结构描述 schema = TypeDescription.createStruct(); schema.addField("id", TypeDescription.createInt()); schema.addField("name", TypeDescription.createString()); } public void run(Configuration conf) throws Exception { Path input = new Path("/orc.txt"); Path output = new Path("/orc-mr"); // 删除之前的结果 FileSystem.get(conf).delete(output, true); // 创建Job Job job = Job.getInstance(conf, "OrcMR"); job.setJarByClass(OrcMR.class); // mapper/reducer实现在该类内部,需要设置这个 /// 输入 OrcInputFormat.addInputPath(job, input); // 文件路径 job.setInputFormatClass(OrcInputFormat.class); // 文件格式 job.setMapperClass(OrcMR.Mapper.class); // mapper实现 job.setMapOutputKeyClass(OrcKey.class); // 中间key job.setMapOutputValueClass(OrcValue.class); // 中间value job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), schema.toString()); // 中间key schema job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.getAttribute(), TypeDescription.createLong().toString()); // 中间value schema // 输出 OrcOutputFormat.setOutputPath(job, output); // 文件路径 OrcOutputFormat.setCompressOutput(job, true); // 开启压缩 OrcOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // gzip压缩 job.setOutputFormatClass(OrcOutputFormat.class); // 文件格式 job.setReducerClass(OrcMR.Reducer.class); // reducer实现 job.setOutputKeyClass(NullWritable.class); // 结果key job.setOutputValueClass(OrcStruct.class); // 结果value job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(), schema.toString()); // 输出orc文件schema // 等待任务执行,打印详情 job.waitForCompletion(true); |
首先还是把行的schema做出来。
mapper部分:
- OrcInputFormat.addInputPath:输入路径
- setInputFormatClass:mapper输入为OrcInputFormat文件格式。
- setMapperClass:mapper实现。
- setMapOutputKeyClass:mapper输出K类型是OrcKey。
- setMapOutputValueClass:mapper输出V类型是OrcValue。
- job.getConfiguration().set:直接向job手动配置Orc任务的一些独有参数,MAPRED_SHUFFLE_KEY_SCHEMA是说mapper输出K的schema,MAPRED_SHUFFLE_VALUE_SCHEMA是mapper输出V的schema。
reducer部分:
- OrcOutputFormat.setOutputPath:输出路径
- OrcOutputFormat.setCompressOutput:输出压缩
- OrcOutputFormat.setOutputCompressorClass:输出Gzip压缩
- setOutputFormatClass:输出为Orc文件格式。
- setReducerClass:reducer实现。
- setOutputKeyClass:reducer输出K类型NullWritable。
- setOutputValueClass:reducer输出V类型OrcStruct。
- job.getConfiguration().set:设置reducer输出V的schema。
运行mapreduce
用hadoop jar命令运行,看到输出文件:
1 2 3 4 |
root@ubuntu:~# hdfs dfs -ls /orc-mr Found 2 items -rw-r--r-- 1 root supergroup 0 2020-10-07 02:41 /orc-mr/_SUCCESS -rw-r--r-- 1 root supergroup 2337 2020-10-07 02:41 /orc-mr/part-r-00000.orc |
我们可以用代码来读一下这个Orc输出文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
///////////// 查看MR输出 // 读打开 Reader in = OrcFile.createReader(new Path("/orc-mr/part-r-00000.orc"), OrcFile.readerOptions(conf)); // 解析schema VectorizedRowBatch inBatch = in.getSchema().createRowBatch(); // 流解析文件 RecordReader rows = in.rows(); while (rows.nextBatch(inBatch)) { // 读1个batch // 列式读取 LongColumnVector idCol = (LongColumnVector)inBatch.cols[0]; // id 列 BytesColumnVector nameCol = (BytesColumnVector)inBatch.cols[1]; // name列 for (int i = 0; i < inBatch.size; i++) { System.out.printf("[Orc-MR-output] id=%d name=%s\n", idCol.vector[i], new String(nameCol.vector[i], nameCol.start[i], nameCol.length[i])); } } rows.close(); |
会打印如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
2020-10-07 08:13:17,142 INFO impl.ReaderImpl: Reading ORC rows from /orc-mr/part-r-00000.orc with {include: null, offset: 0, length: 9223372036854775807, includeAcidColumns: true} 2020-10-07 08:13:17,145 INFO impl.RecordReaderImpl: Reader schema not provided -- using file schema struct<id:int,name:string> [Orc] id=0 name=name-0 [Orc] id=1 name=name-1 [Orc] id=2 name=name-2 [Orc] id=3 name=name-3 [Orc] id=4 name=name-4 [Orc] id=5 name=name-5 [Orc] id=6 name=name-6 [Orc] id=7 name=name-7 [Orc] id=8 name=name-8 [Orc] id=9 name=name-9 [Orc] id=10 name=name-10 |
关于hdfs文件格式与mapreduce的系列博客就此结束,有了这个基础再向上学习flume/hive/spark等应该就手到擒来了。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
