hdfs系列 – SequenceFile格式与mapreduce
这是系列博客,你应该从《hdfs系列 – Text格式与mapreduce》开始阅读。
HDFS程序
SequenceFile是Hadoop内置的二进制文件格式,它支持压缩的同时支持Mapper分片处理,比Text格式更加适合作为数仓存储格式。
SequenceFile文件格式按K-V存储行数据,K是WritableComparable可序列化可比较即可,V是Writable可序列化的即可,具体K与V的含义则由写入者自行解释。
SequenceFile支持不压缩、行压缩、块压缩三种压缩方式,压缩算法则支持类似常见的Snappy、Gzip等,我们一般选择块压缩来带来较高的压缩率,所谓块压缩就是将n行数据整体压缩一次追加到文件中。
代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package cc.yuerblog.fs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import java.io.IOException; public class SequenceFileTest { public void run(Configuration conf) throws IOException { // 路径 Path path = new Path("/sequence.txt"); // GZIP编码器 CompressionCodec codec = new CompressionCodecFactory(conf).getCodecByName("gzip"); // 写打开 SequenceFile.Writer out = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class), // key类型 SequenceFile.Writer.valueClass(Text.class), // value类型 SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, codec), // 块级gzip压缩 SequenceFile.Writer.syncInterval(10 * 1024 * 1024)); // 10MB打一个sync marker,方便MR分片 // 写入数据 for (int i = 0; i < 1000; i++) { out.append(new Text(String.format("key-%d", i)), new Text(String.format("value-%d", i))); } out.close(); // 读打开 SequenceFile.Reader in = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); // 循环读取数据 Text key = new Text(); Text value = new Text(); while (in.next(key, value)) { System.out.printf("[Sequence] %s -> %s\n", key.toString(), value.toString()); } in.close(); } } |
利用SequenceFile的createWriter方法打开文件进行写入:
- SequenceFile.Writer.file:写入路径
- SequenceFile.Writer.keyClass:Key的类型
- SequenceFile.Writer.valueClass:Value的类型
- SequenceFile.Writer.compression:压缩方式,这里为块级GZIP压缩
- SequenceFile.Writer.syncInterval:sync marker的写入间隔,用于文件分片。
sync marker是一段16字节的固定magic字节数组,SequenceFile会在文件中每间隔一定的行就写入一个sync mark,这样在hdfs block内只要找到sync marker就可以切出一个InputSplit分片了(拥有2^(16*8)种组合,很难和数据部分冲突),这样就可以给多个mapper并行输入来提高并发度了,因此sync marker是二进制文件格式的一个主要的分片手段,被多种高级文件格式使用。
我们循环向文件写入Key和Value作为一条记录,一共写入1000个记录。
接着利用SequenceFile.Reader读打开这个文件,读的时候不需要再声明文件的压缩类型、K/V的数据类型,因为这些元信息都被存储在SequenceFile文件头部,我们直接开始循环读取记录即可。
运行HDFS程序
没什么特殊的:
1 |
hadoop jar hdfs-demo-1.0-SNAPSHOT.jar |
查看文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
root@ubuntu:~/bigdata/demo# hdfs dfs -text /sequence.txt 2020-10-07 05:32:11,097 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 2020-10-07 05:32:11,098 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-07 05:32:11,104 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-07 05:32:11,104 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-07 05:32:11,104 INFO compress.CodecPool: Got brand-new decompressor [.gz] key-0 value-0 key-1 value-1 key-2 value-2 key-3 value-3 key-4 value-4 key-5 value-5 key-6 value-6 key-7 value-7 key-8 value-8 key-9 value-9 |
mapreduce程序
根据我们上述写入的SequenceFile文件,每一条记录的K与V都是Text类型,可以直接被SequenceFileInputFormat的RecordReader解析到Mapper的Key和Value上。
因此,我们可以实现如下Mapper和Reducer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
package cc.yuerblog.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class SequenceMR { private static class Mapper extends org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, LongWritable> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { Text row = new Text(String.format("%s=%s", key.toString(), value.toString())); context.write(row, new LongWritable(1)); // (内容,1次) } } private static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable v : values) { count += v.get(); } context.write(key, new LongWritable(count)); } } |
Mapper把输入的K和V连接起来,然后输出<K=V, 1>。Reducer则统计一下每个K=V的出现次数,程序没什么意义,仅仅为了演示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
public void run(Configuration conf) throws Exception { Path input = new Path("/sequence.txt"); Path output = new Path("/sequence-mr"); // 删除之前的结果 FileSystem.get(conf).delete(output, true); // 创建Job Job job = Job.getInstance(conf, "SequenceMR"); job.setJarByClass(SequenceMR.class); // mapper/reducer实现在该类内部,需要设置这个 /// 输入 SequenceFileInputFormat.addInputPath(job, input); // 文件路径 job.setInputFormatClass(SequenceFileInputFormat.class); // 文件格式 job.setMapperClass(SequenceMR.Mapper.class); // mapper实现 job.setMapOutputKeyClass(Text.class); // 中间key job.setMapOutputValueClass(LongWritable.class); // 中间value // 输出 SequenceFileOutputFormat.setOutputPath(job, output); // 文件路径 SequenceFileOutputFormat.setCompressOutput(job, true); // 开启压缩 SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // gzip压缩 SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); // 块级压缩 job.setOutputFormatClass(SequenceFileOutputFormat.class); // 文件格式 job.setReducerClass(SequenceMR.Reducer.class); // reducer实现 job.setOutputKeyClass(Text.class); // 结果key job.setOutputValueClass(LongWritable.class); // 结果value // 等待任务执行,打印详情 job.waitForCompletion(true); } |
Mapper输入包括几个配置项:
- SequenceFileOutputFormat.addInputPath:设置输入路径
- setInputFormatClass:设置SequenceFileInputFormat的文件格式作为输入。
- setMapperClass:mapper实现。
- setMapOutputKeyClass:mapper输出的key类型。
- setMapOutputValueClass:mapper输出的value类型。
Reducer输出包括几个配置:
- SequenceFileOutputFormat.setOutputPath:输出路径
- SequenceFileOutputFormat.setCompressOutput:输出结果压缩
- SequenceFileOutputFormat.setOutputCompressorClass:输出结果采用GZIP压缩
- SequenceFileOutputFormat.setOutputCompressionType:输出结果采用块级压缩
- setOutputFormatClass:输出也是SequenceFile格式
- setReducerClass:reducer实现
- setOutputKeyClass:reducer输出key类型
- setOutputValueClass:reducer输出value类型
这里我们让reducer输出的Key和Value直接进入到SequenceFile的K-V记录,所以大家要注意理解MR的KV和SequenceFile的KV完全是两码事,MR的KV交给具体某种OutputFormat之后怎么存储是OutputFormat的事情。
运行Mapreduce
运行没什么特殊的:
1 |
hadoop jar mr-demo-1.0-SNAPSHOT.jar |
这个MR的输入是sequenceFile,输出也是seqenceFile,当然MR并不要求输入和输出一定要采用同一种文件格式,但是sequenceFile是个很实用的文件格式,能在压缩的同时支持InputSplit。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
root@ubuntu:~/bigdata/demo# hdfs dfs -ls /sequence-mr Found 2 items -rw-r--r-- 1 root supergroup 0 2020-10-07 02:40 /sequence-mr/_SUCCESS -rw-r--r-- 1 root supergroup 4366 2020-10-07 02:40 /sequence-mr/part-r-00000 root@ubuntu:~/bigdata/demo# hdfs dfs -text /sequence-mr/part-r-00000 2020-10-07 05:28:35,086 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 2020-10-07 05:28:35,086 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-07 05:28:35,092 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-07 05:28:35,092 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-07 05:28:35,092 INFO compress.CodecPool: Got brand-new decompressor [.gz] key-0=value-0 1 key-100=value-100 1 key-101=value-101 1 key-102=value-102 1 key-103=value-103 1 key-104=value-104 1 key-105=value-105 1 |
经过前一篇博客的Text格式和本篇SequenceFile格式,我们基本已经熟悉了HDFS和MR的工作原理。
这两种格式都是Hadoop内置的文件格式,并且都是基于行的存储。
接下来我们会再接触2个非Hadoop自带的文件格式,它们一个是行存储,一个是列存储,而列存储是Hive数据仓库主要使用的文件格式,因为它的数据压缩率和IO效率都要高于行存储。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
