hdfs系列 – Avro格式与mapreduce

这是系列博客,你应该从《hdfs系列 – Text格式与mapreduce》开始阅读。

由Hadoop工作组于2009年发布的Apache Avro,是一种基于行的、可高度拆分的数据格式。Avro能够支持多种编程语言。通常,它也被描述为类似于Java序列化的数据序列化系统。为了最大程度地减小文件大小、并提高效率,它将schema存储为JSON格式,而将数据存储为二进制格式。

Avro大家可能不熟悉,但是Protocolbuf大家应该都接触过,其实就是一个序列化协议。

总结Avro:

  • Avro通过JSON来描述一个对象的schema结构,然后就可以序列化或者反序列化为JAVA对象,没什么特殊的。
  • Avro除了能够序列化对象外,还定义了一种文件格式,可以把对象序列化到文件存储,并且能够对文件进行压缩和分片,适用于MR计算任务。

有了上述2个能力,我们可以利用Avro格式来存储数据与参与MR计算。

Maven依赖

包括Avro协议自身,以及avro对mapreduce框架的支持。

定义Avro schema

在maven项目的resources目录下放置一个user.avsc文件,描述avro结构体的schema:

类型为record结构体,名字叫做user,doc是个描述,fields是字段,每个字段有名字和类型。

该user.avsc文件会被mvn打包到jar里,稍后可以通过代码从classpath读取到这个文件。

HDFS程序

我们用avro协议和文件格式来读写文件。

首先是加载schema,然后create一个输出文件流。

创建一个DatumWriter序列化对象,它会将GenericRecord通用结构体序列化为Avro二进制。

把上述对象序列化对象再交给DataFileWriter文件格式化对象,后续直接向该writer写入GenericRecord对象,将会自动的序列化为Avro并按Avro文件格式写入到磁盘文件。因为DataFileWriter负责Avro文件格式的处理,因此给它设置snappy压缩以及sync marker的写入间隔。

通过codecOut.create向打开的文件写入avro文件meta信息(比如schema、压缩、syncmaker间隔等),然后循环创建schema格式的Record对象,设置其id和name属性,通过append方法追加到avro文件。


读取类似,利用avro的FsInput打开文件,它会自动解析avro文件的meta信息,我们按照逆向流程配置GenericRecord类型的反序列化reader,然后再交给DataFileReader来解析avro文件。

最后循环迭代文件中的GenericRecord对象即可。

运行hdfs程序

执行方法与之前一样,我们看一下磁盘文件:

是乱码的,但是也可以看出文件头部的meta信息,包括schema和codec是snappy。

尝试用-text解析显示它:

我们发现hadoop服务器默认就能解码avro格式的文件,看样是亲儿子。

同时我们注意到avro序列化协议与JSON是兼容的。

Mapreduce程序

该MR采用avro文件格式作为Mapper输入,中间K和V采用Avro对象序列化,最终Reducer输出继续采用Avro文件格式。

观察Mapper的输入K-V类型,会发现Avro InputFormat的RecordReader为Mapper喂入的只有K(就是avro对象),而V部分为NullWritable类型没实际作用。

注意GenericRecord类型不能直接用于MR的K而是要通过AvroKey包装一下变成Writable接口的实现才能与MR框架合作。

Mapper的输出K继续采用Avro序列化GenericRecord对象,V采用了Avro序列化的Long,其实V可以直接用LongWritable内置序列化而不必用Avro序列化。

Reducer的输出K继续采用Avro序列化的GenericRecord对象,V则NullWritable,这是Avro MR实现的约定,这样reducer输出的GenericReocrd对象就会被顺序写入Avro格式的文件中了。

运行MR的时候也需要解析schema。

mapper:

  • AvroKeyInputFormat.addInputPath:输入路径
  • setInputFormatClass:Mapper输入为AvroKeyInputFormat文件格式,也就是把record反序列化传给Mapper的K。
  • setMapperClass:mapper实现。
  • setMapOutputKeySchema:mapper输出的K的avro schema,还是genericRecord
  • setMapOutputValueSchema:mapper输出的V的avro schema,是Long。

reducer:

  • AvroKeyOutputFormat.setOutputPath:输出路径
  • AvroKeyOutputFormat.setCompressOutput:输出avro文件开启压缩
  • AvroKeyOutputFormat.setOutputCompressorClass:采用snappy压缩
  • AvroJob.setOutputKeySchema:输出K的avro schema,仍旧是genericRecord。
  • job.setOutputFormatClass:输出格式,reducer的输出K作为avro的行记录。
  • setReducerClass:reducer实现。

运行mapreduce

输出文件如下:

需要理解avro是一种对象序列化协议,同时也自带一种文件格式。

mapper的输入和reducer的输出同时涉及到avro对象序列化和avro文件格式。

mapper的输出K-V在这里仅仅用到了avro对象序列化,至于中间K-V的存储则是MR框架内置格式保存的,不需要我们关心。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~