spark系列 – RDD编程使用

这是一个系列,建议从《spark系列 – spark on yarn搭建》开始阅读。

RDD是spark对数据集的抽象,任意大小的数据集都可以通过1个RDD对象来表达,因此spark编程操作大数据集非常简单。

本篇博客的代码见:https://github.com/owenliang/spark-demo

RDD官方教程:https://spark.apache.org/docs/latest/rdd-programming-guide.html

引入maven依赖

RDD属于spark核心库实现,另外我们引入hadoop-client用于操作HDFS用:

另外别忘记配置maven-shade-plugin插件来打包依赖到Jar包。

初始化连接

  • FileSystem:hdfs连接。
  • SparkConf:spark连接配置,设置一下任务名字就好。
  • JavaSparkContext:传入conf,得到连接。

spark提供命令行工具来提交jar到yarn集群,我们的程序会从环境变量拿到所在的yarn集群配置信息等,不需要我们在代码里显式配置,这是hadoop jar提供MR任务是一样的。

例子1:JVM对象转RDD

准备1个Java的List,然后调用JavaSparkContext的parallelize可以将其转换为RDD数据集,这样就可以用Spark的API来操作RDD了。

这里得到JavaRDD<Integer>后,我们直接将这个RDD保存到了HDFS的/Collection2RDD目录下,采用Text纯文本格式。

执行程序只需要在spark机器上执行:

spark-submit –master yarn –deploy-mode cluster –executor-memory 1G –num-executors 10 ./spark-demo-1.0-SNAPSHOT.jar

  • –master yarn:表示提交给yarn集群执行,这是spark on yarn要求写死的。
  • –deploy-mode cluster:表示把spark application Master运行到yarn集群内,前一篇博客中讲解了cluster和client两种模式的区别。
  • –executor-memory:worker容器的内存限制。
  • –num-executors:在yarn集群启动多少个worker容器来消费DAG任务。

运行后,会看到hdfs里文件已经产生:

可以看到输出和MR输出的目录结构一样,只不过现在把计算引擎从MR换成了spark而已,它们其实是互相取代关系。

例子2:Text格式转换为SequenceFile格式

我们采用spark加载上个例子中的text文件,然后将其输出为sequenceFile格式重新写到HDFS。

  • sc.textFile:加载文本文件(和MR一样会自动解压带压缩后缀的文件),返回String类型的RDD。
  • mapToPair:因为SequenceFile的记录是K-V格式,所以我们要把JavaRDD变成一种特殊的JavaPairRDD,在spark RDD里也就这俩类型,这里传入一个Function来将String处理为K-V。
  • MyPairFunction:返回K是NullWritable,V用Text类型存储String即可。
  • Configuration:JavaPairRDD写出到sequenceFile需要指定压缩等配置,这里利用Hadoop的Configuration类来配置各种property。
  • saveAsNewAPIHadoopFile:保存到/Hdfs2RDD目录,传入K和V的类型,文件格式为SequenceFile以及配置conf。

查看HDFS文件:

经过hdfs dfs -text解码sequenceFile,我们看到每一行记录的K是null,V是原内容,这就是sequenceFile的记录存储格式。

那么JavaRDD和JavaPairRDD有啥其他区别么?有,最关键的就是JavaPairRDD是K-V的,所以支持一些按key做group by之类的操作,而JavaRDD则仅支持一些map/filter之类的线性操作。

例子3:把sequenceFile加载为RDD

接上个例子,我们再把这种K-V的sequenceFile加载回RDD,此时当然会得到JavaPairRDD:

加载过程和写入的传参是差不多的,它要求最后一个参数是hadoop的configuration,我们可以直接从spark的context里拿到。

现在我们只关心这个JavaPairRDD中的V,所以我们对其进行一轮map,传入的Function将被回调得到Tuple2对象,我们_2()就是Value部分,将其返回String类型,最后将得到一个JavaRDD<String>。

最后我们对JavaRDD做collect(),将其数据拿到本地JVM内存里并打印出来,这只适合小数据集的调试,大数据集内存会撑爆(大数据集还是写到HDFS上再去看效果)。

因为最终是拿到JVM内存里,我们部署的时候采用client模式,这样ApplicationMaster(或者叫Driver)会运行在客户端本地,可以直接看到终端输出。(如果采用cluster模式,需要去yarn上看容器日志,在此不作演示)


解释上面出现的一些东西~~~

在上面对rdd进行map这种操作叫做transformations,也就是对1个RDD进行计算和转换得到另外1个RDD。

但是我们要注意,spark并不是立即执行你的transformation,他只是在本地记录你构建的DAG计算图,比如你先map再map再filter,那么它只是在代码中记录你的计算拓扑,并不是立即执行的。这才是spark为什么快于MR框架,因为它会分析你的连续算子迭代,并尽量把连续的算子作为一个单元分配给1个进程连续计算,这样才能避免中间数据的跨网络传输。

那么什么时候DAG图开始提交计算呢?当我们执行actions类型的操作时,比如save文件到HDFS或者collect到本地内存,这时候spark就会进行DAG任务拆解开始真正的计算了。

例子4:RDD复杂计算

接下来我们单纯的演示一下RDD是如何进行复杂数据计算的。

  • words是一个Tuple2列表,Tuple2的K是英文、V是中文,这相当于一个英汉字典。
  • 我们利用parallelizePairs把words列表变成一个JavaPairRDD<String,String>。(因为words里面都是Tuple2)
  • 假定这个RDD后续我们会在计算中反复使用,因此主动把这个RDD cache到spark集群的内存里(只是尽量内存),即便很大的RDD也可以这么做,但是性能不会太好(因为内存有限,涉及到数据换入换出到磁盘或者临时计算)。
  • 接着我随机生成1000个Tuple2,它们的K是随机单词,V是数字1,我们接下来要统计这里面每个单词的出现次数。
  • aggregateByKey是我可以选择的1个比较繁琐的方法,我们用它对K做groupby,然后统计单词的出现次数。
  • SeqFunction:同1个K的次数累加,就是reduce操作。
  • CombFunction:因为RDD是分区并发的,所以最后会对所有分区的K计数进行一次合并,得到每个K的总次数,保存到aggRDD里,里面每条记录就是<单词,总次数>。
  • leftOuterJoin:把上述统计结果和最初的词典RDD基于K(也就是单词)做left join,返回的JavaPairRDD的K是单词,V则是join两侧的V组成的Tuple2,因为是Left join所以Tuple2的右值是可选的。
  • 最后collect打印结果。

打印结果如下:

每个单词的出现总次数和中文翻译被join到一起。

spark什么时候该主动cache一个rdd到集群呢?其实首先要理解RDD其实是一个虚拟概念,spark最终绘制的只是DAG图,因此如果我们不主动把RDD计算结果缓存下来,那么DAG图其实是不会把中间某个数据集具象化的,因此当反复用到某个RDD时,spark可能会反复计算来生成相关的数据流,而主动缓存则是真的来缓存中间结果来实现高效复用。

根据网上经验,如果1个RDD被多个RDD依赖,则可以考虑将这个RDD缓存到集群中,另外RDD不适合太大,否则内存缓存不下其实会涉及到临时计算或者磁盘I/O(可以配置基于内存+磁盘混合cache),性能反而更差。

例子5:变量广播和计数器

属于spark的附加小特性,如果有场景可以用,下面给个例子。

  • broadcast:可以将某个值广播到集群里,这样各个算子可以直接访问Broadcast变量,就像它在本地一样。
  • longAccumulator:创建一个Long类型的计数器,所有算子都可以对其原子+1,可以实现分布式计数。
  • filter:对JavaRDD里的数字做一波过滤,如果遇见大于广播变量数字(也就是>=3),那么计数器+1,并且保留该数字到结果RDD,否则丢弃。
  • filtered.collect():打印过滤后的RDD。
  • accum.value():打印计数器,也就是满足条件的数字有几个。

注意,RDD是分片的,被分布式处理,每个分片的算子进行计数器操作,最终会汇总起来,这些并发问题不需要我们考虑。

执行后打印结果:

过滤后保留了3,4,5三个数字,计数器也是3。

总结

RDD只是编程抽象,实际Spark是分析计算拓扑图,进行算子分组,减少网络传输来实现计算加速,这就是所谓的spark是基于内存的计算模型,少走磁盘少走网络那就是内存了嘛。

spark和MR的关系是平行的,但是spark因为基于DAG设计进行连续计算比MR要快不止百倍。

Hive作为一个SQL引擎,其实同时支持Hive on MR和Hive on spark,也就是说Hive可以将SQL翻译为spark的DAG或者MR的多轮迭代,可见spark和mr的地位都是作为计算引擎而存在的。(Hive底层也支持tez作为计算引擎,仅仅是另外一种选项)。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~