hdfs系列 – Avro格式与mapreduce
这是系列博客,你应该从《hdfs系列 – Text格式与mapreduce》开始阅读。
由Hadoop工作组于2009年发布的Apache Avro,是一种基于行的、可高度拆分的数据格式。Avro能够支持多种编程语言。通常,它也被描述为类似于Java序列化的数据序列化系统。为了最大程度地减小文件大小、并提高效率,它将schema存储为JSON格式,而将数据存储为二进制格式。
Avro大家可能不熟悉,但是Protocolbuf大家应该都接触过,其实就是一个序列化协议。
总结Avro:
- Avro通过JSON来描述一个对象的schema结构,然后就可以序列化或者反序列化为JAVA对象,没什么特殊的。
- Avro除了能够序列化对象外,还定义了一种文件格式,可以把对象序列化到文件存储,并且能够对文件进行压缩和分片,适用于MR计算任务。
有了上述2个能力,我们可以利用Avro格式来存储数据与参与MR计算。
Maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 |
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.avro/avro-mapred --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.10.0</version> </dependency> |
包括Avro协议自身,以及avro对mapreduce框架的支持。
定义Avro schema
在maven项目的resources目录下放置一个user.avsc文件,描述avro结构体的schema:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
{ "type": "record", "name": "user", "doc": "user account", "fields": [{ "name": "id", "type": "int" }, { "name": "name", "type": "string" } ] } |
类型为record结构体,名字叫做user,doc是个描述,fields是字段,每个字段有名字和类型。
该user.avsc文件会被mvn打包到jar里,稍后可以通过代码从classpath读取到这个文件。
HDFS程序
我们用avro协议和文件格式来读写文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
package cc.yuerblog.fs; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.mapred.FsInput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; public class AvroFileTest { public void run(Configuration conf) throws IOException { // HDFS FileSystem dfs = FileSystem.get(conf); // 加载avro记录描述 Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(getClass().getClassLoader().getResourceAsStream("user.avsc")); // 文件路径 Path path = new Path("/avro.txt"); // 打开文件 FSDataOutputStream out = dfs.create(path); // avro对象序列化 DatumWriter ser = new GenericDatumWriter<GenericRecord>(); // avro文件流编码 DataFileWriter codecOut = new DataFileWriter<GenericRecord>(ser); codecOut.setCodec(CodecFactory.snappyCodec()); // 开启snappy压缩 codecOut.setSyncInterval(10 * 1024 * 1024); // 10MB打一个sync marker,方便MR分片 // 初始化avro文件 codecOut.create(schema, out); // 写入1000行记录 for (int i = 0; i < 1000; i++) { GenericData.Record record = new GenericData.Record(schema); record.put("id", i); record.put("name", String.format("name-%d", i)); codecOut.append(record); } codecOut.close(); //////////////// // 读打开, 必须用avro这个api FsInput in = new FsInput(path, conf); // avro对象反序列化 DatumReader de = new GenericDatumReader<GenericRecord>(); // avro文件流解码 DataFileReader codecIn = new DataFileReader<GenericRecord>(in, de); // 读取记录 while (codecIn.hasNext()) { GenericRecord record = (GenericRecord) codecIn.next(); System.out.printf("[Avro] id=%d name=%s\n", (Integer) record.get("id"), record.get("name")); } codecIn.close(); } } |
首先是加载schema,然后create一个输出文件流。
创建一个DatumWriter序列化对象,它会将GenericRecord通用结构体序列化为Avro二进制。
把上述对象序列化对象再交给DataFileWriter文件格式化对象,后续直接向该writer写入GenericRecord对象,将会自动的序列化为Avro并按Avro文件格式写入到磁盘文件。因为DataFileWriter负责Avro文件格式的处理,因此给它设置snappy压缩以及sync marker的写入间隔。
通过codecOut.create向打开的文件写入avro文件meta信息(比如schema、压缩、syncmaker间隔等),然后循环创建schema格式的Record对象,设置其id和name属性,通过append方法追加到avro文件。
读取类似,利用avro的FsInput打开文件,它会自动解析avro文件的meta信息,我们按照逆向流程配置GenericRecord类型的反序列化reader,然后再交给DataFileReader来解析avro文件。
最后循环迭代文件中的GenericRecord对象即可。
运行hdfs程序
执行方法与之前一样,我们看一下磁盘文件:
1 2 3 4 5 |
root@ubuntu:~/bigdata/demo# hdfs dfs -cat /avro.txt Objavro.schema�{"type":"record","name":"user","doc":"user account","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"}]}avro.codec snappy{:�03��Ԉ"T&���P�T name-0 1 2 4 5 |
是乱码的,但是也可以看出文件头部的meta信息,包括schema和codec是snappy。
尝试用-text解析显示它:
1 2 3 4 5 6 7 8 9 10 11 |
root@ubuntu:~/bigdata/demo# hdfs dfs -text /avro.txt {"id":0,"name":"name-0"} {"id":1,"name":"name-1"} {"id":2,"name":"name-2"} {"id":3,"name":"name-3"} {"id":4,"name":"name-4"} {"id":5,"name":"name-5"} {"id":6,"name":"name-6"} {"id":7,"name":"name-7"} {"id":8,"name":"name-8"} {"id":9,"name":"name-9"} |
我们发现hadoop服务器默认就能解码avro格式的文件,看样是亲儿子。
同时我们注意到avro序列化协议与JSON是兼容的。
Mapreduce程序
该MR采用avro文件格式作为Mapper输入,中间K和V采用Avro对象序列化,最终Reducer输出继续采用Avro文件格式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package cc.yuerblog.mr; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; public class AvroMR { private static class Mapper extends org.apache.hadoop.mapreduce.Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, AvroValue<Long>> { @Override protected void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException { Integer id = (Integer)key.datum().get("id"); String name = (String)key.datum().get("name"); System.out.printf("%d -> %s", id, name); Long val = new Long(1); context.write(key, new AvroValue<Long>(val)); // avro record -> 1次 } } private static class Reducer extends org.apache.hadoop.mapreduce.Reducer<AvroKey<GenericData.Record>, AvroValue<Long>, AvroKey<GenericData.Record>, NullWritable> { @Override protected void reduce(AvroKey<GenericData.Record> key, Iterable<AvroValue<Long>> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); // 原样输出avro record } } |
观察Mapper的输入K-V类型,会发现Avro InputFormat的RecordReader为Mapper喂入的只有K(就是avro对象),而V部分为NullWritable类型没实际作用。
注意GenericRecord类型不能直接用于MR的K而是要通过AvroKey包装一下变成Writable接口的实现才能与MR框架合作。
Mapper的输出K继续采用Avro序列化GenericRecord对象,V采用了Avro序列化的Long,其实V可以直接用LongWritable内置序列化而不必用Avro序列化。
Reducer的输出K继续采用Avro序列化的GenericRecord对象,V则NullWritable,这是Avro MR实现的约定,这样reducer输出的GenericReocrd对象就会被顺序写入Avro格式的文件中了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
public void run(Configuration conf) throws Exception { Path input = new Path("/avro.txt"); Path output = new Path("/avro-mr"); // 删除之前的结果 FileSystem.get(conf).delete(output, true); // 加载avro记录描述 Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(getClass().getClassLoader().getResourceAsStream("user.avsc")); // 创建Job Job job = Job.getInstance(conf, "AvroMR"); job.setJarByClass(AvroMR.class); // mapper/reducer实现在该类内部,需要设置这个 /// 输入 AvroKeyInputFormat.addInputPath(job, input); // 文件路径 job.setInputFormatClass(AvroKeyInputFormat.class); // 文件格式 job.setMapperClass(AvroMR.Mapper.class); // mapper实现 AvroJob.setMapOutputKeySchema(job, schema); // 中间输出key依旧是user结构 AvroJob.setMapOutputValueSchema(job, Schema.create(Schema.Type.LONG)); // 中间输出value是avro序列化的long // 输出 AvroKeyOutputFormat.setOutputPath(job, output); // 文件路径 AvroKeyOutputFormat.setCompressOutput(job, true); // 输出压缩 AvroKeyOutputFormat.setOutputCompressorClass(job, SnappyCodec.class); // snappy压缩 AvroJob.setOutputKeySchema(job, schema); // 输出avro key的schema job.setOutputFormatClass(AvroKeyOutputFormat.class); // 文件格式仅输出key job.setReducerClass(AvroMR.Reducer.class); // reducer实现 // 等待任务执行,打印详情 job.waitForCompletion(true); } |
运行MR的时候也需要解析schema。
mapper:
- AvroKeyInputFormat.addInputPath:输入路径
- setInputFormatClass:Mapper输入为AvroKeyInputFormat文件格式,也就是把record反序列化传给Mapper的K。
- setMapperClass:mapper实现。
- setMapOutputKeySchema:mapper输出的K的avro schema,还是genericRecord
- setMapOutputValueSchema:mapper输出的V的avro schema,是Long。
reducer:
- AvroKeyOutputFormat.setOutputPath:输出路径
- AvroKeyOutputFormat.setCompressOutput:输出avro文件开启压缩
- AvroKeyOutputFormat.setOutputCompressorClass:采用snappy压缩
- AvroJob.setOutputKeySchema:输出K的avro schema,仍旧是genericRecord。
- job.setOutputFormatClass:输出格式,reducer的输出K作为avro的行记录。
- setReducerClass:reducer实现。
运行mapreduce
输出文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
root@ubuntu:~/bigdata/demo# hdfs dfs -ls /avro-mr Found 2 items -rw-r--r-- 1 root supergroup 0 2020-10-07 02:41 /avro-mr/_SUCCESS -rw-r--r-- 1 root supergroup 5359 2020-10-07 02:41 /avro-mr/part-r-00000.avro root@ubuntu:~/bigdata/demo# hdfs dfs -text /avro-mr/part-r-00000.avro {"id":0,"name":"name-0"} {"id":1,"name":"name-1"} {"id":2,"name":"name-2"} {"id":3,"name":"name-3"} {"id":4,"name":"name-4"} {"id":5,"name":"name-5"} {"id":6,"name":"name-6"} {"id":7,"name":"name-7"} {"id":8,"name":"name-8"} |
需要理解avro是一种对象序列化协议,同时也自带一种文件格式。
mapper的输入和reducer的输出同时涉及到avro对象序列化和avro文件格式。
mapper的输出K-V在这里仅仅用到了avro对象序列化,至于中间K-V的存储则是MR框架内置格式保存的,不需要我们关心。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
