tensorflow分布式训练 — tensorflow on spark使用方法

tensorflow(2.x版本)生产训练需要在大规模训练样本下完成,单机已经无法满足训练速度。

tensorflow on spark是yahoo开源的基于spark进行分布式tensorflow训练的开发框架,本文要求读者熟练tensorflow单机使用,最好读一下前一篇博客:《tensorflow2.0 – 端到端的wide&deep模型训练》

tensorflow自带分布式训练框架

tensorflow官方自带了分布式训练API,但是有一些小缺点:

1、要求我们手动部署训练代码到多台服务器,并且为每个代码配置环境变量:

  • cluster告知tensorflow进程集群中有哪些节点
  • task告知自己是上述哪个节点,身份是什么?(worker还是cheif)

2、整个训练样本需要手动分发到所有服务器上,tensorflow会自行协调令每个计算节点只训练属于自己的那份数据(虽然他们都有完整的训练集),从而让集群快速训练掉所有的样本。

3、训练过程中如果有某个节点宕机,则训练失败。


tensorflow分布式训练的默认策略MultiWorkerMirroredStrategy是:

  • 所有worker的模型参数都是各自随机初始化的。
  • 所有worker的模型基于各自不重叠的部分训练集分别训练,这叫做”数据并行”。
  • 每一轮所有worker各自训练1个batch,基于各自loss求得各自的梯度,经过互相通讯获知所有worker的本轮梯度,求平均梯度用作梯度下降。
  • 重复训练,直到所有worker将整个训练集耗尽。

总结一下,与单机版的不同之处在于:

  • 数据集被所有worker”瓜分”训练。
  • 所有worker是统一行动的,必须等1个batch完成后,大家再一起进入下一个batch。

那么,到底哪个worker训练的模型是最终的模型呢?

道理很简单,任意一个worker都可以!

因为每一个batch训练后,每个worker都会基于集群所有worker的梯度来调整自己本地的模型参数,所以理论上充分训练后每个worker的模型都是最优的。

所以,我们手动指定某个节点为cheif身份(也就是master),只取cheif节点导出的model文件即可,包括tensorboard采样数据我们也只需要看cheif节点的即可。

关于上述分布式训练原理,大家可以读一下官方文档:https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#top_of_page

简单看了一下,每一轮batch训练完成后,tensorflow采用了一种环形广播的机制来让每个worker知道其他worker的本轮梯度,所以不存在单点性能瓶颈:

tensorflow on spark

因为官方方案的上述缺点,yahoo设计了tensorflow on spark框架,下面简称tfos。

原理

它利用spark编程API的方式拉起多个worker到yarn集群中运行,以此解决了计算资源分配问题。

对于每个worker容器,tfos会准备好TF_CONFIG环境变量并唤起我们编写的tensorflow训练函数(python),此后的工作其实还是tensorflow官方分布式训练框架的逻辑,与tfos就没什么关系了。

tfos提供了python库,我们只需要把现有的训练代码封装到一个函数里,然后交给tfos来分发到spark集群中进行部署即可开始训练。

准备工作

首先我们得了解pyspark,因为我们的tensorflow代码和tfos代码都是python的,所以spark程序必须用pyspark开发而不是用java开发。

其次,我们必须提前将训练样本按tfrecords格式写入到HDFS中,这样后续训练时worker可以直接加载HDFS中的训练文件进行训练。tensorflow分布式训练框架能够智能协调”数据并行策略”,如果我们将样本打散成N个HDFS文件,并且指定拉起M个worker,那么:

  • 如果N>=M,那么每个文件将被某个worker独占,这就是”文件级”的并行。
  • 如果N<M,那么就会有多个worker共享同1个文件,但是消费不同的offset,这就是”记录级”的并行。

从生产环境来讲,我们应该是开发(py)spark程序直接读hive表,将其map转化为tfrecords格式写入到hdfs中,不过就本篇博客来说我们将把CSV训练文件加载到spark中而不是从hive加载,区别仅限于此。

代码

首先是如何正确使用pyspark,我准备了一个说明项目:https://github.com/owenliang/pyspark-demo,通过它你可以学会:

  • 为不同的项目隔离python环境
  • 使用pyspark编程并且提交到yarn集群

其次是tensorflow on spark的说明项目:https://github.com/owenliang/tf2-onspark,你将学会:

  • 如何用spark把训练样本转化为tfrecords格式并写入HDFS。
  • 如何用tfos完成分布式训练,最终得到model和tensorboard。

步骤1:准备python环境

下面演示整个过程,所有操作发生在hadoop客户机。

安装miniconda

主流python多环境管理工具,自行安装:https://docs.conda.io/projects/conda/en/latest/user-guide/install/

我们用conda生成独立的python环境,压缩上传到HDFS,后续pyspark提交任务时可以指定spark executor从HDFS下载Python环境使用,这样就可以实现多项目python环境隔离。

创建tensorflow环境

安装pip依赖

正常来说是把依赖写在项目下的requirements.txt里,这里直接pip install -r 即可,就像我提供的项目一样:https://github.com/owenliang/tf2-onspark/blob/main/requirements.txt,为了演示我手动安装一遍。

因为程序基于tensorflow、pandas(为了加载csv)、pyspark、tensorflow on spark开发,所以安装一下它们:

打包python到HDFS

先找到该python的目录:

将所有文件打包:

将Python.zip上传 HDFS待用:

步骤2:准备tfrecords

克隆我的示例项目:

git clone https://github.com/owenliang/tf2-onspark

观察csv训练集

data下面是csv格式的泰坦尼克数据集:

(tf) [hadoop@10 tf2-onspark]$ ll data/
total 88
-rw-rw-r– 1 hadoop hadoop 28629 Jan 5 16:23 test.csv
-rw-rw-r– 1 hadoop hadoop 60302 Jan 5 16:23 train.csv

长相如下:

我们的目标是把这个csv文件转成tfrecords格式写到HDFS上,用作训练。

观察spark任务提交命令

我们要编写一段pyspark程序,让它在AM(spark的application master)上把CSV用pandas加载到内存里,将其按行记录拆分后转化成 RDD,然后对该RDD中的每一行执行map序列化为tfrecord格式的记录,最后利用tensorflow官方提供的org.tensorflow.hadoop.io.TFRecordFileOutputFormat输出格式写入到HDFS文件中。

提交命令见submit_gen.sh:

我们实际提交任务的入口文件是src/gen_tfrecords.py,它支持–train_csv等一系列传参,这属于应用层。

更关键的是理解spark和pyspark,下面说一下。

–archives指定了要通过HDFS分发到计算节点的文件包括:

  • Python.zip:令spark executor直接从HDFS下载,解压到yarn容器内的当前工作目录的Python文件夹,后面–conf spark.pyspark.python可以控制pyspark使用该Python执行代码。
  • data.zip:在客户端本地把train.csv和test.csv打包成zip,上传到HDFS临时目录,进而被executor下载并解压到容器内工作目录的data文件夹,我们代码会加载它们用于生成tfrecords。

–jars指定要分发哪些jar包到spark executor内加入到CLASSPATH中:

  • hdfs:///tf2-onspark/tensorflow-hadoop-1.10.0.jar:我们最终将tfrecords写入HDFS需要用到这个jar包内的org.tensorflow.hadoop.io.TFRecordFileOutputFormat类。(PS:下面会说明如何编译这个Jar包)

–py-files:

  • 进入src目录,把所有源代码打包zip,传给–py-files分发到所有executor,这样pyspark会自动将zip解压到某个目录然后把它加入PYTHONPATH,这样我们的spark程序才能顺利找到import的python模块。

我们写pyspark程序时,可能对某个RDD做一个map操作,传入一个function,其实pyspark框架会将function以及其依赖的模块关系(注意不包括依赖的模块代码)全部序列化成字节码(pyspark用的cloudpickle项目完成序列化),通过网络传输到executor再反序列化,此时会去动态import依赖的module,所以py-files就是要保证代码在executor可以查找到的地方,这和java把jar包分发出去是一个道理。

编译tensorflow-hadoop-1.10.0.jar

根据tensorflow on spark安装指导,我们需要从tensorflow官方子项目编译得到这个jar包,用到的项目是:https://github.com/tensorflow/ecosystem/tree/master/hadoop

步骤是:

分析src/gen_tfrecords.py代码

首先是入口文件:

ArgumentParser支持了一些传参,主要是指定tfrecords文件写到HDFS什么路径下,以及切分成几份(训练”数据并行”目的)。

创建spark session,用pandas把csv加载到内存,传给自己写的dataframe_to_tfrecords函数进行处理。

处理数据的逻辑就是逐行的把样本转成tf.train.Example对象,然后序列化成二进制,最终利用jar包写入到HDFS中:

to_example函数可以将一行样本将变为一个Example对象并序列化,不了解需要看一下文章顶部的前一篇博客。

我们将pandas的行数组丢给spark的parallelize变为RDD,同时指定分片num_partitions个,每个分片最终会落地为一个HDFS文件,就达到了训练集拆分文件的目的。

RDD经过to_example分布式计算转换后就可以保存到HDFS目录中去了。

正式执行submit_gen.sh

注意submit_gen.sh中,我们采用了:

spark-submit \ –master yarn \ –deploy-mode client \

为了方便调试我们让AM运行在client本地(–deploy-mode client),那么因为–conf spark.pyspark.python=./Python/bin/python3 的影响,它就会在本地找这个python,然而我们本意是让任务跑到yarn容器里并从HDFS拉python环境下来。

为了能够使用client模式,我们把conda Python软链到当前目录下即可:

然后运行命令:

sh submit_gen.sh

查看HDFS中的tfrecords文件:

train和test两个csv各自被转成了tfrecords格式,并且都切成了5份。

步骤3:分布式训练

现在就需要用到tfos这个python包来实现spark分布式训练了。

观察submit_train.sh

该脚本提交训练任务:

Python.zip和src.zip仍旧要分发,spark.pyspark.python配置当然也不变。

多了几个–conf非常关键:

  • –conf spark.executorEnv.LD_LIBRARY_PATH=${LIB_JVM}:是tensorflowonspark要求指定libjvm.so,根据自己的jvm安装目录做一下指向即可,没有它无法运行成功。
  • –conf spark.dynamicAllocation.enabled=false:禁止动态分配executor数量,默认spark有一个行为就是如果集群有闲置资源的话可能会创建比–num-executors更多的容器,这个对我们来说不可接受,因为tensorflow分布式训练的节点数量必须固定(还记得TF_CONFIG吗)。
  • –conf spark.yarn.maxAppAttempts=1:限制该任务只能尝试submit一次,应该没啥用。
  • –executor-cores 2并且–conf spark.task.cpus=2:tfos要求一个executor中只能跑1个task,我们可以令executor-core为2核同时指定单个task也用2核,这样就可以让1个executor中只跑1个task(spark的executor与task关系参考:https://blog.csdn.net/xuejianbest/article/details/85994504

后续就是程序主入口src/train.py,以及它需要的各种应用层参数,都是我们自定义的东西,这里一定要注意worker_size和spark的–num-executors要一致,启动几个executor就对应几个tensorflow worker节点,还记得tensorflowonspark原理吗?忘记请回看上面内容。

另外注意:

–model_dir hdfs:///tf2-onspark/model \
–tensorboard_dir hdfs:///tf2-onspark/tensorboard

这2个hdfs输出目录只有cheif节点会写入(tensorflow on spark会指定其中1个executor作为cheif身份),而其他worker节点则只会将model和tensorflow保存到yarn容器本地盘,随着任务结束就释放了,还记得tensorflowonspark原理吗?忘记请回看上面内容。

分析src/train.py

该文件为训练主入口代码:

和直接使用tensorflow分布式训练框架的区别在于,我们是运行在spark环境下的。

准备好SparkContext后,把它交给tfos的TFCluster.run函数,传入我们的训练函数main_fun,该函数会被pyspark AM序列化后传到所有executor上执行。

args参数其实可以传任意东西,会被回调给main_fun,我们把命令行参数传入使用。

我们需要告诉tfos我们要启动几个worker的训练集群,所以args.worker_size是我们的第4个传参。

num_ps永远传0,因为我们不是采用的PS分布式训练架构,而是tensorflow的同步训练策略MultiWorkerMirroredStrategy(本文开始介绍过它的工作原理)。

tensorboard控制tfos是否给我们开启tensorboard的web界面,我们在集群里训练不需要它帮我们开启,后续我们直接用tensorboard命令行打开hdfs上的hdfs:///tf2-onspark/tensorboard即可显示界面。

input_mode=TFCluster.InputMode.TENSORFLOW是说我们自己给模型喂HDFS上的tfreocrds训练数据,其实tfos支持另外一种直接将RDD喂给tensorflow模型训练的方式,这种方式不太offical,所以我们压根不用去了解。

最后master_node=’cheif’是让tfos给某一个executor下发 TF_CONFIG时候指定其task type叫做cheif,而其余executor则叫做worker,这样我们的main_fun就可以通过第二个参数ctx的ctx.job_name获取到这个名字,决定当前训练节点是将model和tensorboard写入到HDFS还是写入到容器的临时目录。

根据对上述参数的了解,你会发现main_fun里根据是否为cheif身份,对tensorboard和save目录做了区分,cheif写入HDFS而非cheif写入本地被丢弃即可。

tensorflow分布式训练框架用法不变,在分布式策略的scope下进行keras模型的网络结构建设并compile,最后fit并且save模型即可,这里build_model和单机一模一样,大家简单一看即可:

 

这里需要注意一个关键:

dataset = dataset.batch(args.batch_size * args.worker_size).shuffle(args.shuffle_size)

分布式训练要求:如果我们希望单个worker采用batch_size训练,那么就得让dataset采用batch_size*worker_size的大小,其中worker_size是计算节点个数。

这个牵扯到”数据并行”的底层实现,我们只需要按tensorflow要求做即可。

至于从HDFS上加载若干tfrecords文件得到Dataset对象,这个和单机版没有任何区别,只是文件schema从file://变成了HDFS,因为tensorflow默认支持HDFS访问:

我们传入的实际是一个通配符字符串:hdfs:///tf2-onspark/train/part-*,那么经过tf Dataset的list_files可以从HDFS上枚举出所有训练文件,经过interleave函数可以将每个文件名转换成一个子Dataset,每个子Dataset需要经过parse_func反序列化tfrecord记录为Example对象,并返回样本元祖(x, y),其中x是特征,y是标签。

interleave通过num_parallel_calls传参可以开启多线程数据预加载等特性,可以具体看一下文档。

总之,这样的父子结构dataset交给分布式scope下compile的模型后,在fit训练中会自动在多个worker之间均衡数据(文章开始讲过”数据划分”策略),这样就算有很大的数据集也可以通过增加更多的worker来加快训练速度。

正式执行submit_gen.sh

现在我们开始进行分布式训练:

sh submit_train.sh

然后我遇到了一个错误:

Traceback (most recent call last):
File “/data/workspace/users/liangdong/tf2-onspark/src/train.py”, line 4, in <module>
from tensorflowonspark import TFCluster
File “/data/workspace/users/liangdong/tf2-onspark/Python/lib/python3.8/site-packages/tensorflowonspark/TFCluster.py”, line 35, in <module>
from . import TFSparkNode
File “/data/workspace/users/liangdong/tf2-onspark/Python/lib/python3.8/site-packages/tensorflowonspark/TFSparkNode.py”, line 23, in <module>
from packaging import version
ModuleNotFoundError: No module named ‘packaging’

看样是tensorflowonspark依赖了一个packaing模块,可能它做pip包的时候没有写好依赖,导致我们必须手动安装一下了。

我们必须回头给conda tf环境安装一下这个包:

pip install packaging -i https://pypi.tuna.tsinghua.edu.cn/simple

然后重新把python打包上传一下HDFS,在此不做演示。

启动后可以看出tfos的原理:

训练完成后,HDFS上的model和tensorboard文件:

这些都是cheif身份的节点写入的,cheif是worker中的幸运者,被我们选中采用它的训练结果和训练过程,其实它和普通worker没啥区别,我们选哪个worker作为cheif得到的结果都差不多,实际我们也不需要操心到底cheif是哪个,让tfos随便给我们指定好即可,我们根据ctx.job_name做不同动作即可。

如果训练中想实时观察tensorboard,那么直接命令行启动tensorboard指向hdfs即可:

(tf) [hadoop@10 tf2-onspark]$ tensorboard –logdir=hdfs:///tf2-onspark/model/

最后:分布式predict(inference)

得到模型之后,如果你有需求对大规模数据进行预测,那么把model加载到tf-serving然后通过网络调用来预测不是一个好选择。

更好的方法是直接用tfos启动spark,用多个worker加载model,对HDFS数据集进行sharding后直接全内存运算打分,最后结果写回HDFS即可。

这个过程非常简单,大家自己看看tfos的demo即可:https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/keras/mnist_inference.py

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~