这是一个系列,建议从《spark系列 – spark on yarn搭建》开始阅读。
RDD是spark对数据集的抽象,任意大小的数据集都可以通过1个RDD对象来表达,因此spark编程操作大数据集非常简单。
本篇博客的代码见:https://github.com/owenliang/spark-demo
RDD官方教程:https://spark.apache.org/docs/latest/rdd-programming-guide.html
引入maven依赖
RDD属于spark核心库实现,另外我们引入hadoop-client用于操作HDFS用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.0</version> </dependency> </dependencies> |
另外别忘记配置maven-shade-plugin插件来打包依赖到Jar包。
初始化连接
1 2 3 4 5 6 |
public class Main { public static void main(String []args) { try { FileSystem dfs = FileSystem.get(new Configuration()); // hdfs连接 SparkConf conf = new SparkConf().setAppName("spark-demo"); // 配置 JavaSparkContext sc = new JavaSparkContext(conf); // Spark连接 |
- FileSystem:hdfs连接。
- SparkConf:spark连接配置,设置一下任务名字就好。
- JavaSparkContext:传入conf,得到连接。
spark提供命令行工具来提交jar到yarn集群,我们的程序会从环境变量拿到所在的yarn集群配置信息等,不需要我们在代码里显式配置,这是hadoop jar提供MR任务是一样的。
例子1:JVM对象转RDD
准备1个Java的List,然后调用JavaSparkContext的parallelize可以将其转换为RDD数据集,这样就可以用Spark的API来操作RDD了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package cc.yuerblog.dag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.util.Arrays; import java.util.List; // 本地数据 转 RDD public class Collection2Text { public void run(FileSystem dfs, JavaSparkContext sc) throws IOException { String path = "/Collection2RDD"; // 删除输出目录 dfs.delete(new Path(path), true); // JAVA集合转成多分片RDD List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data); // 保存到hdfs的对应目录下 distData.saveAsTextFile(path); } } |
这里得到JavaRDD<Integer>后,我们直接将这个RDD保存到了HDFS的/Collection2RDD目录下,采用Text纯文本格式。
执行程序只需要在spark机器上执行:
spark-submit –master yarn –deploy-mode cluster –executor-memory 1G –num-executors 10 ./spark-demo-1.0-SNAPSHOT.jar
- –master yarn:表示提交给yarn集群执行,这是spark on yarn要求写死的。
- –deploy-mode cluster:表示把spark application Master运行到yarn集群内,前一篇博客中讲解了cluster和client两种模式的区别。
- –executor-memory:worker容器的内存限制。
- –num-executors:在yarn集群启动多少个worker容器来消费DAG任务。
运行后,会看到hdfs里文件已经产生:
1 2 3 4 5 6 7 8 9 10 11 12 |
hdfs dfs -ls /Collection2RDD Found 4 items -rw-r--r-- 1 root supergroup 0 2020-10-10 05:17 /Collection2RDD/_SUCCESS -rw-r--r-- 1 root supergroup 2 2020-10-10 05:17 /Collection2RDD/part-00000 -rw-r--r-- 1 root supergroup 4 2020-10-10 05:17 /Collection2RDD/part-00001 -rw-r--r-- 1 root supergroup 4 2020-10-10 05:17 /Collection2RDD/part-00002 root@ubuntu:~# hdfs dfs -cat /Collection2RDD/* 1 2 3 4 5 |
可以看到输出和MR输出的目录结构一样,只不过现在把计算引擎从MR换成了spark而已,它们其实是互相取代关系。
例子2:Text格式转换为SequenceFile格式
我们采用spark加载上个例子中的text文件,然后将其输出为sequenceFile格式重新写到HDFS。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package cc.yuerblog.dag; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.io.IOException; public class Text2Seq { public void run(FileSystem dfs, JavaSparkContext sc) throws IOException { String path = "/Hdfs2RDD"; // 删除输出目录 dfs.delete(new Path(path), true); // 将Collection2RDD的text输出作为输入 JavaRDD<String> distData = sc.textFile("/Collection2RDD"); // 处理成K-V格式的RDD JavaPairRDD pairRDD = distData.mapToPair(new MyPairFunction()); // 该transformer对象会被序列化传到executor机器上执行 // 按sequenceFile保存到hdfs的对应目录下,采用Gzip压缩 // // 需要准备一个hadoop的configuration来定义输出格式的配置 Configuration conf = new Configuration(); conf.setBoolean(FileOutputFormat.COMPRESS, true); // 开启压缩 conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class); // 压缩类型GZIP,父类是CompressionCodec conf.set(FileOutputFormat.COMPRESS_TYPE, SequenceFile.CompressionType.BLOCK.toString()); // 设置块级压缩 pairRDD.saveAsNewAPIHadoopFile(path, NullWritable.class, Text.class, SequenceFileOutputFormat.class, conf); // sequenceFile的K是NullWritable,V是text } // 静态内部类 private static class MyPairFunction implements PairFunction<String, NullWritable, Text> { public Tuple2<NullWritable, Text> call(String s) throws Exception { return new Tuple2<NullWritable, Text>(NullWritable.get(), new Text(s)); } } } |
- sc.textFile:加载文本文件(和MR一样会自动解压带压缩后缀的文件),返回String类型的RDD。
- mapToPair:因为SequenceFile的记录是K-V格式,所以我们要把JavaRDD变成一种特殊的JavaPairRDD,在spark RDD里也就这俩类型,这里传入一个Function来将String处理为K-V。
- MyPairFunction:返回K是NullWritable,V用Text类型存储String即可。
- Configuration:JavaPairRDD写出到sequenceFile需要指定压缩等配置,这里利用Hadoop的Configuration类来配置各种property。
- saveAsNewAPIHadoopFile:保存到/Hdfs2RDD目录,传入K和V的类型,文件格式为SequenceFile以及配置conf。
查看HDFS文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
root@ubuntu:~# hdfs dfs -ls /Hdfs2RDD Found 4 items -rw-r--r-- 1 root supergroup 0 2020-10-10 05:17 /Hdfs2RDD/_SUCCESS -rw-r--r-- 1 root supergroup 235 2020-10-10 05:17 /Hdfs2RDD/part-r-00000 -rw-r--r-- 1 root supergroup 239 2020-10-10 05:17 /Hdfs2RDD/part-r-00001 -rw-r--r-- 1 root supergroup 239 2020-10-10 05:17 /Hdfs2RDD/part-r-00002 root@ubuntu:~# hdfs dfs -text /Hdfs2RDD/* 2020-10-11 12:51:23,648 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 2020-10-11 12:51:23,649 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-11 12:51:23,677 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-11 12:51:23,677 INFO compress.CodecPool: Got brand-new decompressor [.gz] 2020-10-11 12:51:23,677 INFO compress.CodecPool: Got brand-new decompressor [.gz] (null) 1 (null) 2 (null) 3 (null) 4 (null) 5 |
经过hdfs dfs -text解码sequenceFile,我们看到每一行记录的K是null,V是原内容,这就是sequenceFile的记录存储格式。
那么JavaRDD和JavaPairRDD有啥其他区别么?有,最关键的就是JavaPairRDD是K-V的,所以支持一些按key做group by之类的操作,而JavaRDD则仅支持一些map/filter之类的线性操作。
例子3:把sequenceFile加载为RDD
接上个例子,我们再把这种K-V的sequenceFile加载回RDD,此时当然会得到JavaPairRDD:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
package cc.yuerblog.dag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import scala.Tuple2; import java.io.IOException; public class Seq2RDD { private static class ExtractValueFromPair implements Function<Tuple2<NullWritable, Text>, String> { public String call(Tuple2<NullWritable, Text> t) throws Exception { return t._2().toString(); } } public void run(FileSystem dfs, JavaSparkContext sc) throws IOException { // 加载sequenceFile,得到K-V RDD JavaPairRDD rdd = sc.newAPIHadoopFile("/Hdfs2RDD", SequenceFileInputFormat.class, NullWritable.class, Text.class, sc.hadoopConfiguration()); // 取出所有的V JavaRDD rows = rdd.map(new ExtractValueFromPair()); // 可以令--deploy-mode client,这样spark AM将跑在本地,collect的数据将直接打印在本地终端,该方式用于调试 System.out.println(rows.collect()); } } |
加载过程和写入的传参是差不多的,它要求最后一个参数是hadoop的configuration,我们可以直接从spark的context里拿到。
现在我们只关心这个JavaPairRDD中的V,所以我们对其进行一轮map,传入的Function将被回调得到Tuple2对象,我们_2()就是Value部分,将其返回String类型,最后将得到一个JavaRDD<String>。
最后我们对JavaRDD做collect(),将其数据拿到本地JVM内存里并打印出来,这只适合小数据集的调试,大数据集内存会撑爆(大数据集还是写到HDFS上再去看效果)。
因为最终是拿到JVM内存里,我们部署的时候采用client模式,这样ApplicationMaster(或者叫Driver)会运行在客户端本地,可以直接看到终端输出。(如果采用cluster模式,需要去yarn上看容器日志,在此不作演示)
解释上面出现的一些东西~~~
在上面对rdd进行map这种操作叫做transformations,也就是对1个RDD进行计算和转换得到另外1个RDD。
但是我们要注意,spark并不是立即执行你的transformation,他只是在本地记录你构建的DAG计算图,比如你先map再map再filter,那么它只是在代码中记录你的计算拓扑,并不是立即执行的。这才是spark为什么快于MR框架,因为它会分析你的连续算子迭代,并尽量把连续的算子作为一个单元分配给1个进程连续计算,这样才能避免中间数据的跨网络传输。
那么什么时候DAG图开始提交计算呢?当我们执行actions类型的操作时,比如save文件到HDFS或者collect到本地内存,这时候spark就会进行DAG任务拆解开始真正的计算了。
例子4:RDD复杂计算
接下来我们单纯的演示一下RDD是如何进行复杂数据计算的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
public class RDDTrans { // RDD单分区聚合函数 private static class SeqFunction implements Function2<Integer, Integer, Integer> { public Integer call(Integer a, Integer b) throws Exception { return a+b; } } // RDD多分区合并函数 private static class CombFunction implements Function2<Integer, Integer, Integer> { public Integer call(Integer a, Integer b) throws Exception { return a+b; } } public void run(FileSystem dfs, JavaSparkContext sc) throws IOException { List<Tuple2<String, String>> words = new ArrayList<Tuple2<String, String>>(); words.add(new Tuple2<String, String>("hello", "你好")); words.add(new Tuple2<String, String>("bye", "再见")); words.add(new Tuple2<String, String>("world", "世界")); // 词典RDD,用于JOIN JavaPairRDD dictRDD = sc.parallelizePairs(words); // 把字典cache起来,后续用来join更快 dictRDD.cache(); // 随机单词RDD: <单词, 1> List<Tuple2<String, Integer>> l = new ArrayList<Tuple2<String, Integer>>(); for (int i = 0; i < 1000; i++) { int index = new Random().nextInt(words.size()); l.add(new Tuple2<String, Integer>(words.get(index)._1(), 1)); } JavaPairRDD wordsRDD = sc.parallelizePairs(l); // 单词次数统计,采用比较底层也比较灵活的一种API JavaPairRDD aggRDD = wordsRDD.aggregateByKey(Integer.valueOf(0), new SeqFunction(), new CombFunction()); // 把统计好的单词和词典做left join // JOIN返回值的K是2个RDD的K,而V则是2个RDD的V组成的Tuple,因为是左连接所以右侧RDD的V可能没有,因此是Optional的 JavaPairRDD<String, Tuple2<Integer, Optional<String>>> joinedRDD = aggRDD.leftOuterJoin(dictRDD); // 打印JOIN好的数据 System.out.println(joinedRDD.collect()); } } |
- words是一个Tuple2列表,Tuple2的K是英文、V是中文,这相当于一个英汉字典。
- 我们利用parallelizePairs把words列表变成一个JavaPairRDD<String,String>。(因为words里面都是Tuple2)
- 假定这个RDD后续我们会在计算中反复使用,因此主动把这个RDD cache到spark集群的内存里(只是尽量内存),即便很大的RDD也可以这么做,但是性能不会太好(因为内存有限,涉及到数据换入换出到磁盘或者临时计算)。
- 接着我随机生成1000个Tuple2,它们的K是随机单词,V是数字1,我们接下来要统计这里面每个单词的出现次数。
- aggregateByKey是我可以选择的1个比较繁琐的方法,我们用它对K做groupby,然后统计单词的出现次数。
- SeqFunction:同1个K的次数累加,就是reduce操作。
- CombFunction:因为RDD是分区并发的,所以最后会对所有分区的K计数进行一次合并,得到每个K的总次数,保存到aggRDD里,里面每条记录就是<单词,总次数>。
- leftOuterJoin:把上述统计结果和最初的词典RDD基于K(也就是单词)做left join,返回的JavaPairRDD的K是单词,V则是join两侧的V组成的Tuple2,因为是Left join所以Tuple2的右值是可选的。
- 最后collect打印结果。
打印结果如下:
1 |
[(world,(317,Optional[世界])), (hello,(326,Optional[你好])), (bye,(357,Optional[再见]))] |
每个单词的出现总次数和中文翻译被join到一起。
spark什么时候该主动cache一个rdd到集群呢?其实首先要理解RDD其实是一个虚拟概念,spark最终绘制的只是DAG图,因此如果我们不主动把RDD计算结果缓存下来,那么DAG图其实是不会把中间某个数据集具象化的,因此当反复用到某个RDD时,spark可能会反复计算来生成相关的数据流,而主动缓存则是真的来缓存中间结果来实现高效复用。
根据网上经验,如果1个RDD被多个RDD依赖,则可以考虑将这个RDD缓存到集群中,另外RDD不适合太大,否则内存缓存不下其实会涉及到临时计算或者磁盘I/O(可以配置基于内存+磁盘混合cache),性能反而更差。
例子5:变量广播和计数器
属于spark的附加小特性,如果有场景可以用,下面给个例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package cc.yuerblog.dag; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.util.LongAccumulator; import java.io.IOException; import java.util.Arrays; public class BrodAndAccu { private static class FilterFunc implements Function<Integer, Boolean> { private Broadcast<Long> v; private LongAccumulator accum; public FilterFunc(Broadcast<Long> v, LongAccumulator accum) { this.v = v; this.accum = accum; } public Boolean call(Integer integer) throws Exception { if (Long.valueOf(integer) >= v.value()) {// 保留大于v的数字 accum.add(1); return true; } return false ; } } public void run(FileSystem dfs, JavaSparkContext sc) throws IOException { // 创建广播变量 Broadcast<Long> v = sc.broadcast(Long.valueOf(3)); // 创建计数器,用于统计大于v的数字个数 LongAccumulator accum = sc.sc().longAccumulator("validCount"); // 生成RDD JavaRDD rdd = sc.parallelize(Arrays.asList(1,2,3,4,5)); // 过滤RDD JavaRDD filtered = rdd.filter(new FilterFunc(v, accum)); // 在driver端收集结果 System.out.println(filtered.collect()); System.out.println(accum.value()); } } |
- broadcast:可以将某个值广播到集群里,这样各个算子可以直接访问Broadcast变量,就像它在本地一样。
- longAccumulator:创建一个Long类型的计数器,所有算子都可以对其原子+1,可以实现分布式计数。
- filter:对JavaRDD里的数字做一波过滤,如果遇见大于广播变量数字(也就是>=3),那么计数器+1,并且保留该数字到结果RDD,否则丢弃。
- filtered.collect():打印过滤后的RDD。
- accum.value():打印计数器,也就是满足条件的数字有几个。
注意,RDD是分片的,被分布式处理,每个分片的算子进行计数器操作,最终会汇总起来,这些并发问题不需要我们考虑。
执行后打印结果:
1 2 |
[3, 4, 5] 3 |
过滤后保留了3,4,5三个数字,计数器也是3。
总结
RDD只是编程抽象,实际Spark是分析计算拓扑图,进行算子分组,减少网络传输来实现计算加速,这就是所谓的spark是基于内存的计算模型,少走磁盘少走网络那就是内存了嘛。
spark和MR的关系是平行的,但是spark因为基于DAG设计进行连续计算比MR要快不止百倍。
Hive作为一个SQL引擎,其实同时支持Hive on MR和Hive on spark,也就是说Hive可以将SQL翻译为spark的DAG或者MR的多轮迭代,可见spark和mr的地位都是作为计算引擎而存在的。(Hive底层也支持tez作为计算引擎,仅仅是另外一种选项)。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
