pyspark简单实践与原理

本文记录pyspark的基础用法和重要理解,希望对大家也很有帮助。

安装

因为我们是学习用途,所以不需要专门下载spark服务端进行搭建。

通过直接为python安装pyspark包即可,它内涵了与spark通讯的SDK以及单机spark模拟环境:

如果你有多个python环境,那么需要注意配置一下环境变量,告知spark服务端(在这里是内置的服务端)使用正确的python版本:

基本概念

RDD

代表一个数据集合,我们写spark程序时并不会加载到我们本地,它只是一个逻辑概念。

spark可以从本地磁盘、HDFS等环境中生成一个RDD,我们认为此时RDD对象只是一个标识,并不是真的要去把HDFS文件读到哪里。

2种算子

写spark程序会用到2类函数,称为2种算子:

  • transformation:例如map、filter等,调用时并不会直接触发集群计算,此时客户端只是在构建你的计算拓扑关系,形成一个多轮迭代的调用树。
  • action:例如save、collect、count、sum等,调用时会触发真正的集群计算,计算的拓扑是此前若干transformation调用绘制而来的。

缓存cache

spark直到action调用时才会进行整个拓扑的调度计算。

spark RDD的核心原理是不可变性,而我们知道RDD作为分布式数据集一般规模很大,所以spark并不是我们想象的那样先计算出第一轮transformation的输出RDD保存起来,然后再作为第二轮transformation的输入,这样会占用大量的内存资源,基本不可能实现。

简单的说,spark是流式计算,RDD的计算在不同的算子之间流式的传输,并不是先算完前一轮再计算下一轮。

因此,如果我们的某个RDD需要在多个计算分支里被重复用到,那么就可以考虑主动要求RDD缓存。一旦被要求缓存,那么当RDD被第一次计算的过程中,就会把RDD内的记录真正的保存到内存/磁盘上,以便其他计算分支复用。

所以,大家注意cache应该在action之前被调用,这样在真正计算时会顺带帮我们把RDD缓存在某个地方。

故障恢复

我感觉cache很容易与spark的故障恢复混淆。

spark在迭代我们的transformation计算拓扑时,如果中途机器异常等原因导致某一轮RDD损坏,那么spark就从最近一个上游的RDD重新计算出被损坏的RDD,从而成恢复工作。

我们使用cache并不是为了帮助spark故障恢复,故障恢复是spark内置的能力,使用cache的唯一理由应该是在多个计算分支里复用结果集。

检查点checkpoint

个人理解是一个更可靠的缓存,依旧是为了复用RDD。

因为cache是通过内存与磁盘混合的模式缓存RDD数据,但是一旦个别机器故障还是得重算。

因此,spark可以调用checkpoint来让RDD直接保存到HDFS上,依靠HFDS保障高可靠性。

checkpoint也是一个transformation,不会调用时立即执行,但是checkpoint默认会重算一次RDD并输出HDFS。

spark官方建议,如果要用checkpoint,建议配合cache使用,这样当action执行的时候,会先将结果写入cache,然后checkpoint会直接再把cache的内容写入HDFS,避免了checkpoint二次计算RDD的损耗。

partition分区

如果RDD加载的是一个HDFS文件,那么计算一个大文件时就需要考虑并行化。

因此,RDD需要打散,也就是分区partition,这样每个partition可以并行计算。

客户端允许我们指定RDD分成几个partition,例如HDFS这种分布式文件系统底层是block分块存储的,本身就能支持数据的切割。

shuffle打散

搞过map-reduce的同学应该了解,shuffle是map与reduce之间的一个数据聚合、排序、传输过程。

对spark的多个transformation算子之间也是一样的,每个partition计算后的结果如果不能继续在partition内完成下一轮计算,那么就需要shuffle传输到其他计算节点做进一步聚合(例如map操作)。

在编写spark的时候,需要特别注意减少shuffle的次数,减少shuffle的传输量,尽量在当前partition内完成多轮计算,尽量将需要shuffle的量级缩小。

实践

测试数据

我有一个本地文件作为最初的RDD输入,其内容如下:

初始化框架

  • SparkConf:配置客户端,主要是服务端地址(local表示模拟spark集群模式)和本应用的名字
  • SparkContext:传入配置,是我们操作spark的入口对象

我们接下来要写的代码都放在一个叫做main的函数里(不是spark要求的),目前还是一个空函数,也就是没有任何计算任务。

广播变量

spark支持将一些共享的配置类信息分发到集群中,通过使用broadcast广播变量即可实现。

它的参数是任意python对象,返回值是我们后续访问的操作标识,后面会演示如何在计算代码中访问广播变量。

全局计数变量

通过accumulator可以制造一个全局共享计数的变量,可以用于分布式累加计数的用途。

我的目标是统计不同单词的数量,后续我们会看到具体的用法。

创建RDD

我们指定从本地磁盘创建RDD,注意这个操作并不会立即读取文件,仅仅是一个标识而已。

执行flatMap

我要声明第一个transformation算子,把文件中每一行按空格分词,产生一个(单词,1)的RDD集合,表示单词出现了1次。

flatMap要求处理方法返回的是数组,也就是从1行变成N个单词。

我们通过rdd的toDebugString可以调试rdd,观察它的拓扑关系:

可见,当前的RDD叫做MapPartitionRDD,来自于HadoopRDD,也就是我们上一轮textFile创建的RDD。

缓存RDD

调用cache可以缓存该rdd,因为我后续会有2个计算分支都用到这个集合,为了复用避免可能的重复计算,我显式的指定cache。

cache是纯内存缓存,底层利用persist方法实现,我们可以直接调用persist指定其他的缓存模式,避免内存不够用的情况出现,在此不做演示。

执行count

我们知道count是action算子,立即会发起分布式计算。

我们打印出了整个文件中,总共有多少个单词。

reduce聚合

按words_rdd中第一列作为key聚合,并进行reduce累计,可以得到每个单词的出现次数。

reduceByKey类似于map-reduce中的map+reduce,spark会在mapper端进行一次combine,在reducer端再进行一次merge,也就是默认会对shuffle做优化。

绝大多数transformation算子,都可以传入一个numParitions的参数,即经过计算后产生的RDD应该使用几个分区,我们应该特别关注这个事情。

另外特别说明,正是因为我们执行了collect算子,reduceByKey的计算才得以进行,否则我们仍旧是在拓扑的描述阶段。

同时,我们复用到了被cache的words_rdd,这对于大数据量计算特别有意义。

访问共享数据

我们接着对per_word_count进行一次map操作。

map是对RDD中每个元素执行一个计算函数,返回值替换了原有的元素。

我们在addUnit函数中,访问broadcast变量中的Unit,将其追加到单词的出现次数后面。另外,我们基于accumulator变量,累计了不同单词的出现次数。

排序并输出到文件

现在,我们调用sortBy来对per_word_count做排序,第一个参数返回要排序的字段依据,第二个参数指定降序。

最后我们把集合写到文件中,一般来说是HDFS上的一个文件路径。

总结

上述仅仅是spark的批量计算编程方法。

实际spark还支持streaming流式计算,后续有时间会简单的了解一下。

因为spark编程和开发单机程序没有什么两样,但是实际却是在分布式大数据集上的分布式并行计算,所以spark有很多Machine Leanring的库被开发出来,轻松帮你解决海量数据的机器学习问题。

另外,spark体现的编程思想值得一提,即:计算向存储移动。

因为java和python都支持对象的序列化,所以可以把计算代码序列化后发往存储节点参与分布式运算,这是为什么大多数大数据开源平台都仅支持jvm系语言的重要原因。

参考

 

发表评论

电子邮件地址不会被公开。