pyspark简单实践与原理
本文记录pyspark的基础用法和重要理解,希望对大家也很有帮助。
安装
因为我们是学习用途,所以不需要专门下载spark服务端进行搭建。
通过直接为python安装pyspark包即可,它内涵了与spark通讯的SDK以及单机spark模拟环境:
1 |
pip3 install pyspark |
如果你有多个python环境,那么需要注意配置一下环境变量,告知spark服务端(在这里是内置的服务端)使用正确的python版本:
1 |
export PYSPARK_PYTHON=/usr/local/bin/python3 |
基本概念
RDD
代表一个数据集合,我们写spark程序时并不会加载到我们本地,它只是一个逻辑概念。
spark可以从本地磁盘、HDFS等环境中生成一个RDD,我们认为此时RDD对象只是一个标识,并不是真的要去把HDFS文件读到哪里。
2种算子
写spark程序会用到2类函数,称为2种算子:
- transformation:例如map、filter等,调用时并不会直接触发集群计算,此时客户端只是在构建你的计算拓扑关系,形成一个多轮迭代的调用树。
- action:例如save、collect、count、sum等,调用时会触发真正的集群计算,计算的拓扑是此前若干transformation调用绘制而来的。
缓存cache
spark直到action调用时才会进行整个拓扑的调度计算。
spark RDD的核心原理是不可变性,而我们知道RDD作为分布式数据集一般规模很大,所以spark并不是我们想象的那样先计算出第一轮transformation的输出RDD保存起来,然后再作为第二轮transformation的输入,这样会占用大量的内存资源,基本不可能实现。
简单的说,spark是流式计算,RDD的计算在不同的算子之间流式的传输,并不是先算完前一轮再计算下一轮。
因此,如果我们的某个RDD需要在多个计算分支里被重复用到,那么就可以考虑主动要求RDD缓存。一旦被要求缓存,那么当RDD被第一次计算的过程中,就会把RDD内的记录真正的保存到内存/磁盘上,以便其他计算分支复用。
所以,大家注意cache应该在action之前被调用,这样在真正计算时会顺带帮我们把RDD缓存在某个地方。
故障恢复
我感觉cache很容易与spark的故障恢复混淆。
spark在迭代我们的transformation计算拓扑时,如果中途机器异常等原因导致某一轮RDD损坏,那么spark就从最近一个上游的RDD重新计算出被损坏的RDD,从而成恢复工作。
我们使用cache并不是为了帮助spark故障恢复,故障恢复是spark内置的能力,使用cache的唯一理由应该是在多个计算分支里复用结果集。
检查点checkpoint
个人理解是一个更可靠的缓存,依旧是为了复用RDD。
因为cache是通过内存与磁盘混合的模式缓存RDD数据,但是一旦个别机器故障还是得重算。
因此,spark可以调用checkpoint来让RDD直接保存到HDFS上,依靠HFDS保障高可靠性。
checkpoint也是一个transformation,不会调用时立即执行,但是checkpoint默认会重算一次RDD并输出HDFS。
spark官方建议,如果要用checkpoint,建议配合cache使用,这样当action执行的时候,会先将结果写入cache,然后checkpoint会直接再把cache的内容写入HDFS,避免了checkpoint二次计算RDD的损耗。
partition分区
如果RDD加载的是一个HDFS文件,那么计算一个大文件时就需要考虑并行化。
因此,RDD需要打散,也就是分区partition,这样每个partition可以并行计算。
客户端允许我们指定RDD分成几个partition,例如HDFS这种分布式文件系统底层是block分块存储的,本身就能支持数据的切割。
shuffle打散
搞过map-reduce的同学应该了解,shuffle是map与reduce之间的一个数据聚合、排序、传输过程。
对spark的多个transformation算子之间也是一样的,每个partition计算后的结果如果不能继续在partition内完成下一轮计算,那么就需要shuffle传输到其他计算节点做进一步聚合(例如map操作)。
在编写spark的时候,需要特别注意减少shuffle的次数,减少shuffle的传输量,尽量在当前partition内完成多轮计算,尽量将需要shuffle的量级缩小。
实践
测试数据
我有一个本地文件作为最初的RDD输入,其内容如下:
1 2 3 4 |
cat input.txt a b c d a e f g h c b |
初始化框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# -*- coding: utf-8 -*- ## Spark Application - execute with spark-submit ## Imports from pyspark import SparkConf, SparkContext, StorageLevel ## Module Constants APP_NAME = "My Spark Application" ## Closure Functions ## Main functionality def main(sc): pass if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local") sc = SparkContext(conf=conf) # Execute Main functionality main(sc) |
- SparkConf:配置客户端,主要是服务端地址(local表示模拟spark集群模式)和本应用的名字
- SparkContext:传入配置,是我们操作spark的入口对象
我们接下来要写的代码都放在一个叫做main的函数里(不是spark要求的),目前还是一个空函数,也就是没有任何计算任务。
广播变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# -*- coding: utf-8 -*- ## Spark Application - execute with spark-submit ## Imports from pyspark import SparkConf, SparkContext, StorageLevel ## Module Constants APP_NAME = "My Spark Application" ## Closure Functions ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 broadcast_vals = sc.broadcast({"unit": "次"}) if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local") sc = SparkContext(conf=conf) # Execute Main functionality main(sc) |
spark支持将一些共享的配置类信息分发到集群中,通过使用broadcast广播变量即可实现。
它的参数是任意python对象,返回值是我们后续访问的操作标识,后面会演示如何在计算代码中访问广播变量。
全局计数变量
1 2 3 4 5 6 7 8 9 10 |
## Closure Functions ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 all_words_count = sc.accumulator(0) |
通过accumulator可以制造一个全局共享计数的变量,可以用于分布式累加计数的用途。
我的目标是统计不同单词的数量,后续我们会看到具体的用法。
创建RDD
我们指定从本地磁盘创建RDD,注意这个操作并不会立即读取文件,仅仅是一个标识而已。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
## Closure Functions ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 all_words_count = sc.accumulator(0) # 从本地文件创建RDD raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6) |
执行flatMap
我要声明第一个transformation算子,把文件中每一行按空格分词,产生一个(单词,1)的RDD集合,表示单词出现了1次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
## Closure Functions def split(line): words = [] for word in line.split(" "): words.append((word, 1)) return words ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 all_words_count = sc.accumulator(0) # 从本地文件创建RDD raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6) # 文本行RDD -> 词频RDD words_rdd = raw_rdd.flatMap(split) # 打印一下words_rdd的拓扑关系 print(words_rdd.toDebugString().decode('utf-8')) |
flatMap要求处理方法返回的是数组,也就是从1行变成N个单词。
我们通过rdd的toDebugString可以调试rdd,观察它的拓扑关系:
1 2 3 |
(8) PythonRDD[2] at RDD at PythonRDD.scala:49 [] | /Users/liangdong/Documents/github/spark/py-demo/input.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 [] | /Users/liangdong/Documents/github/spark/py-demo/input.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 [] |
可见,当前的RDD叫做MapPartitionRDD,来自于HadoopRDD,也就是我们上一轮textFile创建的RDD。
缓存RDD
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
## Closure Functions def split(line): words = [] for word in line.split(" "): words.append((word, 1)) return words ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 all_words_count = sc.accumulator(0) # 从本地文件创建RDD raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6) # 文本行RDD -> 词频RDD words_rdd = raw_rdd.flatMap(split) # 打印一下words_rdd的拓扑关系 print(words_rdd.toDebugString().decode('utf-8')) # 缓存words_rdd,用于后续的2个分支计算复用 words_rdd.cache() # 也可以用persist配置内存+磁盘混合缓存 |
调用cache可以缓存该rdd,因为我后续会有2个计算分支都用到这个集合,为了复用避免可能的重复计算,我显式的指定cache。
cache是纯内存缓存,底层利用persist方法实现,我们可以直接调用persist指定其他的缓存模式,避免内存不够用的情况出现,在此不做演示。
执行count
我们知道count是action算子,立即会发起分布式计算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
## Module Constants APP_NAME = "My Spark Application" ## Closure Functions def split(line): words = [] for word in line.split(" "): words.append((word, 1)) return words ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 all_words_count = sc.accumulator(0) # 从本地文件创建RDD raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6) # 文本行RDD -> 词频RDD words_rdd = raw_rdd.flatMap(split) # 打印一下words_rdd的拓扑关系 print(words_rdd.toDebugString().decode('utf-8')) # 缓存words_rdd,用于后续的2个分支计算复用 words_rdd.cache() # 也可以用persist配置内存+磁盘混合缓存 # 计算总共有多少单词 total_words = words_rdd.count() # 打印总单词数量 print("总单词数量:", total_words) |
我们打印出了整个文件中,总共有多少个单词。
1 |
总单词数量: 11 |
reduce聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
## Closure Functions def split(line): words = [] for word in line.split(" "): words.append((word, 1)) return words def count(left_val, right_val): return left_val + right_val ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 all_words_count = sc.accumulator(0) # 从本地文件创建RDD raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6) # 文本行RDD -> 词频RDD words_rdd = raw_rdd.flatMap(split) # 打印一下words_rdd的拓扑关系 print(words_rdd.toDebugString().decode('utf-8')) # 缓存words_rdd,用于后续的2个分支计算复用 words_rdd.cache() # 也可以用persist配置内存+磁盘混合缓存 # 计算总共有多少单词 total_words = words_rdd.count() # 打印总单词数量 print("总单词数量:", total_words) # 在每个partition内做聚合 per_word_count = words_rdd.reduceByKey(count, 4) print("单词统计:", per_word_count.collect()) |
按words_rdd中第一列作为key聚合,并进行reduce累计,可以得到每个单词的出现次数。
1 |
单词统计: [('b', 2), ('c', 2), ('g', 1), ('a', 2), ('e', 1), ('d', 1), ('h', 1), ('f', 1)] |
reduceByKey类似于map-reduce中的map+reduce,spark会在mapper端进行一次combine,在reducer端再进行一次merge,也就是默认会对shuffle做优化。
绝大多数transformation算子,都可以传入一个numParitions的参数,即经过计算后产生的RDD应该使用几个分区,我们应该特别关注这个事情。
另外特别说明,正是因为我们执行了collect算子,reduceByKey的计算才得以进行,否则我们仍旧是在拓扑的描述阶段。
同时,我们复用到了被cache的words_rdd,这对于大数据量计算特别有意义。
访问共享数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
## Closure Functions def split(line): words = [] for word in line.split(" "): words.append((word, 1)) return words def count(left_val, right_val): return left_val + right_val def addUnit(item): all_words_count.add(1) return (item[0], "{}{}".format(item[1], broadcast_vals.value["unit"])) ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 global broadcast_vals broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 global all_words_count all_words_count = sc.accumulator(0) # 从本地文件创建RDD raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6) # 文本行RDD -> 词频RDD words_rdd = raw_rdd.flatMap(split) # 打印一下words_rdd的拓扑关系 print(words_rdd.toDebugString().decode('utf-8')) # 缓存words_rdd,用于后续的2个分支计算复用 words_rdd.cache() # 也可以用persist配置内存+磁盘混合缓存 # 计算总共有多少单词 total_words = words_rdd.count() # 打印总单词数量 print("总单词数量:", total_words) # 在每个partition内做聚合 per_word_count = words_rdd.reduceByKey(count, 4) print("单词统计:", per_word_count.collect()) # 给统计结果增加"计数单位" detail_word_count = per_word_count.map(addUnit) print("详细统计:", detail_word_count.collect()) print("去重单词数量:", all_words_count.value) |
我们接着对per_word_count进行一次map操作。
map是对RDD中每个元素执行一个计算函数,返回值替换了原有的元素。
我们在addUnit函数中,访问broadcast变量中的Unit,将其追加到单词的出现次数后面。另外,我们基于accumulator变量,累计了不同单词的出现次数。
1 2 3 |
单词统计: [('b', 2), ('c', 2), ('g', 1), ('a', 2), ('e', 1), ('d', 1), ('h', 1), ('f', 1)] 详细统计: [('b', '2次'), ('c', '2次'), ('g', '1次'), ('a', '2次'), ('e', '1次'), ('d', '1次'), ('h', '1次'), ('f', '1次')] 去重单词数量: 8 |
排序并输出到文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
## Closure Functions def split(line): words = [] for word in line.split(" "): words.append((word, 1)) return words def count(left_val, right_val): return left_val + right_val def addUnit(item): all_words_count.add(1) return (item[0], "{}{}".format(item[1], broadcast_vals.value["unit"])) def sort_key(item): return item[1] ## Main functionality def main(sc): # 广播到集群所有节点共享的数据 global broadcast_vals broadcast_vals = sc.broadcast({"unit": "次"}) # 制造一个全局共享统计型变量 global all_words_count all_words_count = sc.accumulator(0) # 从本地文件创建RDD raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6) # 文本行RDD -> 词频RDD words_rdd = raw_rdd.flatMap(split) # 打印一下words_rdd的拓扑关系 print(words_rdd.toDebugString().decode('utf-8')) # 缓存words_rdd,用于后续的2个分支计算复用 words_rdd.cache() # 也可以用persist配置内存+磁盘混合缓存 # 计算总共有多少单词 total_words = words_rdd.count() # 打印总单词数量 print("总单词数量:", total_words) # 在每个partition内做聚合 per_word_count = words_rdd.reduceByKey(count, 4) print("单词统计:", per_word_count.collect()) # 给统计结果增加"计数单位" detail_word_count = per_word_count.map(addUnit) print("详细统计:", detail_word_count.collect()) print("去重单词数量:", all_words_count.value) # 按出现次数排序 sorted_rdd = per_word_count.sortBy(sort_key, False) # 写到文件中 sorted_rdd.saveAsTextFile("/Users/liangdong/Documents/github/spark/py-demo/output") |
现在,我们调用sortBy来对per_word_count做排序,第一个参数返回要排序的字段依据,第二个参数指定降序。
最后我们把集合写到文件中,一般来说是HDFS上的一个文件路径。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
output/ total 16 -rw-r--r-- 1 liangdong staff 0 7 3 13:11 _SUCCESS -rw-r--r-- 1 liangdong staff 0 7 3 13:11 part-00000 -rw-r--r-- 1 liangdong staff 27 7 3 13:11 part-00001 -rw-r--r-- 1 liangdong staff 0 7 3 13:11 part-00002 -rw-r--r-- 1 liangdong staff 45 7 3 13:11 part-00003 liangdongs-MacBook-Pro:py-demo liangdong$ cat output/part-00001 ('b', 2) ('c', 2) ('a', 2) liangdongs-MacBook-Pro:py-demo liangdong$ cat output/part-00003 ('g', 1) ('e', 1) ('d', 1) ('h', 1) ('f', 1) |
总结
上述仅仅是spark的批量计算编程方法。
实际spark还支持streaming流式计算,后续有时间会简单的了解一下。
因为spark编程和开发单机程序没有什么两样,但是实际却是在分布式大数据集上的分布式并行计算,所以spark有很多Machine Leanring的库被开发出来,轻松帮你解决海量数据的机器学习问题。
另外,spark体现的编程思想值得一提,即:计算向存储移动。
因为java和python都支持对象的序列化,所以可以把计算代码序列化后发往存储节点参与分布式运算,这是为什么大多数大数据开源平台都仅支持jvm系语言的重要原因。
参考
- 《PySpark Tutorials》:快速且系统的学习pyspark。
- 《Spark性能优化–和Shuffle搏斗》:了解spark开发需要注意的性能问题。
- 《Spark中的checkpoint作用与用法》:了解一下checkpoint、cache之间的关系。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
