tensorflow分布式训练 — tensorflow on spark使用方法
tensorflow(2.x版本)生产训练需要在大规模训练样本下完成,单机已经无法满足训练速度。
tensorflow on spark是yahoo开源的基于spark进行分布式tensorflow训练的开发框架,本文要求读者熟练tensorflow单机使用,最好读一下前一篇博客:《tensorflow2.0 – 端到端的wide&deep模型训练》。
tensorflow自带分布式训练框架
tensorflow官方自带了分布式训练API,但是有一些小缺点:
1、要求我们手动部署训练代码到多台服务器,并且为每个代码配置环境变量:
1 2 3 4 5 6 |
os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "worker": ["host1:port", "host2:port", "host3:port"] }, "task": {"type": "worker", "index": 1} }) |
- cluster告知tensorflow进程集群中有哪些节点
- task告知自己是上述哪个节点,身份是什么?(worker还是cheif)
2、整个训练样本需要手动分发到所有服务器上,tensorflow会自行协调令每个计算节点只训练属于自己的那份数据(虽然他们都有完整的训练集),从而让集群快速训练掉所有的样本。
3、训练过程中如果有某个节点宕机,则训练失败。
tensorflow分布式训练的默认策略MultiWorkerMirroredStrategy是:
- 所有worker的模型参数都是各自随机初始化的。
- 所有worker的模型基于各自不重叠的部分训练集分别训练,这叫做”数据并行”。
- 每一轮所有worker各自训练1个batch,基于各自loss求得各自的梯度,经过互相通讯获知所有worker的本轮梯度,求平均梯度用作梯度下降。
- 重复训练,直到所有worker将整个训练集耗尽。
总结一下,与单机版的不同之处在于:
- 数据集被所有worker”瓜分”训练。
- 所有worker是统一行动的,必须等1个batch完成后,大家再一起进入下一个batch。
那么,到底哪个worker训练的模型是最终的模型呢?
道理很简单,任意一个worker都可以!
因为每一个batch训练后,每个worker都会基于集群所有worker的梯度来调整自己本地的模型参数,所以理论上充分训练后每个worker的模型都是最优的。
所以,我们手动指定某个节点为cheif身份(也就是master),只取cheif节点导出的model文件即可,包括tensorboard采样数据我们也只需要看cheif节点的即可。
关于上述分布式训练原理,大家可以读一下官方文档:https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#top_of_page。
简单看了一下,每一轮batch训练完成后,tensorflow采用了一种环形广播的机制来让每个worker知道其他worker的本轮梯度,所以不存在单点性能瓶颈:
tensorflow on spark
因为官方方案的上述缺点,yahoo设计了tensorflow on spark框架,下面简称tfos。
原理
它利用spark编程API的方式拉起多个worker到yarn集群中运行,以此解决了计算资源分配问题。
对于每个worker容器,tfos会准备好TF_CONFIG环境变量并唤起我们编写的tensorflow训练函数(python),此后的工作其实还是tensorflow官方分布式训练框架的逻辑,与tfos就没什么关系了。
tfos提供了python库,我们只需要把现有的训练代码封装到一个函数里,然后交给tfos来分发到spark集群中进行部署即可开始训练。
准备工作
首先我们得了解pyspark,因为我们的tensorflow代码和tfos代码都是python的,所以spark程序必须用pyspark开发而不是用java开发。
其次,我们必须提前将训练样本按tfrecords格式写入到HDFS中,这样后续训练时worker可以直接加载HDFS中的训练文件进行训练。tensorflow分布式训练框架能够智能协调”数据并行策略”,如果我们将样本打散成N个HDFS文件,并且指定拉起M个worker,那么:
- 如果N>=M,那么每个文件将被某个worker独占,这就是”文件级”的并行。
- 如果N<M,那么就会有多个worker共享同1个文件,但是消费不同的offset,这就是”记录级”的并行。
从生产环境来讲,我们应该是开发(py)spark程序直接读hive表,将其map转化为tfrecords格式写入到hdfs中,不过就本篇博客来说我们将把CSV训练文件加载到spark中而不是从hive加载,区别仅限于此。
代码
首先是如何正确使用pyspark,我准备了一个说明项目:https://github.com/owenliang/pyspark-demo,通过它你可以学会:
- 为不同的项目隔离python环境
- 使用pyspark编程并且提交到yarn集群
其次是tensorflow on spark的说明项目:https://github.com/owenliang/tf2-onspark,你将学会:
- 如何用spark把训练样本转化为tfrecords格式并写入HDFS。
- 如何用tfos完成分布式训练,最终得到model和tensorboard。
步骤1:准备python环境
下面演示整个过程,所有操作发生在hadoop客户机。
安装miniconda
主流python多环境管理工具,自行安装:https://docs.conda.io/projects/conda/en/latest/user-guide/install/。
我们用conda生成独立的python环境,压缩上传到HDFS,后续pyspark提交任务时可以指定spark executor从HDFS下载Python环境使用,这样就可以实现多项目python环境隔离。
创建tensorflow环境
1 2 3 4 |
1,创建名为tf的python3.8环境 conda create -n tf python=3.8 2,切换到该python环境 conda activate tf |
安装pip依赖
正常来说是把依赖写在项目下的requirements.txt里,这里直接pip install -r 即可,就像我提供的项目一样:https://github.com/owenliang/tf2-onspark/blob/main/requirements.txt,为了演示我手动安装一遍。
因为程序基于tensorflow、pandas(为了加载csv)、pyspark、tensorflow on spark开发,所以安装一下它们:
1 2 3 4 |
pip install tensorflow -i https://pypi.tuna.tsinghua.edu.cn/simple pip install pandas -i https://pypi.tuna.tsinghua.edu.cn/simple pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple pip install tensorflowonspark -i https://pypi.tuna.tsinghua.edu.cn/simple |
打包python到HDFS
先找到该python的目录:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
(tf) [hadoop@10 liangdong]$ which python ~/workspace/service/anaconda3/envs/tf/bin/python (tf) [hadoop@10 liangdong]$ cd ~/workspace/service/anaconda3/envs/tf (tf) [hadoop@10 tf]$ ll total 20 drwxrwxr-x 3 hadoop hadoop 4096 Jan 5 16:11 bin drwxrwxr-x 2 hadoop hadoop 30 Jan 5 16:05 compiler_compat drwxrwxr-x 2 hadoop hadoop 4096 Jan 5 16:05 conda-meta drwxrwxr-x 9 hadoop hadoop 4096 Jan 5 16:05 include drwxrwxr-x 3 hadoop hadoop 25 Jan 5 16:05 info drwxrwxr-x 15 hadoop hadoop 4096 Jan 5 16:05 lib drwxrwxr-x 11 hadoop hadoop 128 Jan 5 16:11 share drwxrwxr-x 3 hadoop hadoop 146 Jan 5 16:05 ssl drwxrwxr-x 3 hadoop hadoop 21 Jan 5 16:05 x86_64-conda_cos6-linux-gnu |
将所有文件打包:
1 |
zip -r Python.zip * |
将Python.zip上传 HDFS待用:
1 2 |
hdfs dfs -mkdir /tf2-onspark hdfs dfs -put Python.zip /tf2-onspark |
步骤2:准备tfrecords
克隆我的示例项目:
git clone https://github.com/owenliang/tf2-onspark
观察csv训练集
data下面是csv格式的泰坦尼克数据集:
(tf) [hadoop@10 tf2-onspark]$ ll data/
total 88
-rw-rw-r– 1 hadoop hadoop 28629 Jan 5 16:23 test.csv
-rw-rw-r– 1 hadoop hadoop 60302 Jan 5 16:23 train.csv
长相如下:
我们的目标是把这个csv文件转成tfrecords格式写到HDFS上,用作训练。
观察spark任务提交命令
我们要编写一段pyspark程序,让它在AM(spark的application master)上把CSV用pandas加载到内存里,将其按行记录拆分后转化成 RDD,然后对该RDD中的每一行执行map序列化为tfrecord格式的记录,最后利用tensorflow官方提供的org.tensorflow.hadoop.io.TFRecordFileOutputFormat输出格式写入到HDFS文件中。
提交命令见submit_gen.sh:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
#!/bin/bash cd src && zip -r src.zip * && mv src.zip .. && cd - cd data && zip -r data.zip * && mv data.zip .. && cd - TRAIN_NUM_PARTITIONS=5 TEST_NUM_PARTITIONS=5 TRAIN_OUTPUT=/tf2-onspark/train TEST_OUTPUT=/tf2-onspark/test # 删除旧输出 hdfs dfs -rm -r ${TRAIN_OUTPUT} hdfs dfs -rm -r ${TEST_OUTPUT} # --master yarn :运行到yarn集群,固定写法 # --deploy-mode cluster:AM运行到yarn中,如果改成client则需要确保本地目录有./Python/bin/python3 # --num-executors 1:一个executor容器 # --archives hdfs:///Python.zip#Python:从hdfs集群下载/Python.zip到executor工作目录,并解压到Python目录 # --py-files ./src.zip:项目python源代码,会解压到executor的某目录下并令PYTHONPATH指向该目录 # --conf spark.pyspark.python=./Python/bin/python3:指定使用自行上传的Python spark-submit \ --master yarn \ --deploy-mode client \ --num-executors 1 \ --executor-memory 1G \ --archives hdfs:///tf2-onspark/Python.zip#Python,data.zip#data \ --py-files ./src.zip \ --conf spark.pyspark.python=./Python/bin/python3 \ --jars hdfs:///tf2-onspark/tensorflow-hadoop-1.10.0.jar \ src/gen_tfrecords.py \ --train_csv ./data/train.csv \ --test_csv ./data/test.csv \ --train_num_partitions ${TRAIN_NUM_PARTITIONS} \ --test_num_partitions ${TEST_NUM_PARTITIONS} \ --train_output ${TRAIN_OUTPUT} \ --test_output ${TEST_OUTPUT} |
我们实际提交任务的入口文件是src/gen_tfrecords.py,它支持–train_csv等一系列传参,这属于应用层。
更关键的是理解spark和pyspark,下面说一下。
–archives指定了要通过HDFS分发到计算节点的文件包括:
- Python.zip:令spark executor直接从HDFS下载,解压到yarn容器内的当前工作目录的Python文件夹,后面–conf spark.pyspark.python可以控制pyspark使用该Python执行代码。
- data.zip:在客户端本地把train.csv和test.csv打包成zip,上传到HDFS临时目录,进而被executor下载并解压到容器内工作目录的data文件夹,我们代码会加载它们用于生成tfrecords。
–jars指定要分发哪些jar包到spark executor内加入到CLASSPATH中:
- hdfs:///tf2-onspark/tensorflow-hadoop-1.10.0.jar:我们最终将tfrecords写入HDFS需要用到这个jar包内的org.tensorflow.hadoop.io.TFRecordFileOutputFormat类。(PS:下面会说明如何编译这个Jar包)
–py-files:
- 进入src目录,把所有源代码打包zip,传给–py-files分发到所有executor,这样pyspark会自动将zip解压到某个目录然后把它加入PYTHONPATH,这样我们的spark程序才能顺利找到import的python模块。
我们写pyspark程序时,可能对某个RDD做一个map操作,传入一个function,其实pyspark框架会将function以及其依赖的模块关系(注意不包括依赖的模块代码)全部序列化成字节码(pyspark用的cloudpickle项目完成序列化),通过网络传输到executor再反序列化,此时会去动态import依赖的module,所以py-files就是要保证代码在executor可以查找到的地方,这和java把jar包分发出去是一个道理。
编译tensorflow-hadoop-1.10.0.jar
根据tensorflow on spark安装指导,我们需要从tensorflow官方子项目编译得到这个jar包,用到的项目是:https://github.com/tensorflow/ecosystem/tree/master/hadoop。
步骤是:
1 2 3 4 5 6 7 |
git clone https://github.com/tensorflow/ecosystem.git cd ecosystem/hadoop # 编译jar(最好配置一下settings.xml走镜像加速,否则太慢) mvn clean package # 上传 hdfs dfs -put target/tensorflow-hadoop-1.10.0.jar /tf2-onspark/ |
分析src/gen_tfrecords.py代码
首先是入口文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from pyspark.sql import SparkSession from dataset import dataframe_to_tfrecords import pandas as pd import argparse parser = argparse.ArgumentParser() parser.add_argument('--train_csv', help='train csv文件名') parser.add_argument('--test_csv', help='test csv文件名') parser.add_argument('--train_num_partitions', help='train tfrecords文件分片个数', type=int) parser.add_argument('--test_num_partitions', help='test tfrecords文件分片个数', type=int) parser.add_argument('--train_output', help='保存train tfrecords的目录') parser.add_argument('--test_output', help='保存test tfrecords的目录') args = parser.parse_args() sess = SparkSession.builder.appName('gen_tfrecords').enableHiveSupport().getOrCreate() dataframe_to_tfrecords(sess, pd.read_csv(args.train_csv), args.train_num_partitions, args.train_output, include_outputs=True) dataframe_to_tfrecords(sess, pd.read_csv(args.test_csv), args.test_num_partitions, args.test_output, include_outputs=False) |
ArgumentParser支持了一些传参,主要是指定tfrecords文件写到HDFS什么路径下,以及切分成几份(训练”数据并行”目的)。
创建spark session,用pandas把csv加载到内存,传给自己写的dataframe_to_tfrecords函数进行处理。
处理数据的逻辑就是逐行的把样本转成tf.train.Example对象,然后序列化成二进制,最终利用jar包写入到HDFS中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
def dataframe_to_tfrecords(sess, df, num_partitions, output_dir, include_outputs=False): all_feature_spec = deepcopy(model_spec['inputs']) if include_outputs: all_feature_spec.update(model_spec['outputs']) rows = [] for _, row in df.iterrows(): rows.append(row) def to_example(row): feature_dict = {} for feature_name, feature_spec in all_feature_spec.items(): feature_value = row[feature_name] if feature_value is None or pd.isna(feature_value): feature_value = feature_spec['default'] if feature_spec['type'] == 'int': feature = tf.train.Feature(int64_list=tf.train.Int64List(value=[int(feature_value)])) elif feature_spec['type'] == 'float': feature = tf.train.Feature(float_list=tf.train.FloatList(value=[float(feature_value)])) elif feature_spec['type'] == 'str': feature = tf.train.Feature( bytes_list=tf.train.BytesList(value=[str(feature_value).encode('utf-8')])) feature_dict[feature_name] = feature example = tf.train.Example(features=tf.train.Features(feature=feature_dict)) return example.SerializeToString(), None train_rdd = sess.sparkContext.parallelize(rows, num_partitions).map(to_example) train_rdd.saveAsNewAPIHadoopFile(output_dir, 'org.tensorflow.hadoop.io.TFRecordFileOutputFormat', keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") |
to_example函数可以将一行样本将变为一个Example对象并序列化,不了解需要看一下文章顶部的前一篇博客。
我们将pandas的行数组丢给spark的parallelize变为RDD,同时指定分片num_partitions个,每个分片最终会落地为一个HDFS文件,就达到了训练集拆分文件的目的。
RDD经过to_example分布式计算转换后就可以保存到HDFS目录中去了。
正式执行submit_gen.sh
注意submit_gen.sh中,我们采用了:
spark-submit \ –master yarn \ –deploy-mode client \
为了方便调试我们让AM运行在client本地(–deploy-mode client),那么因为–conf spark.pyspark.python=./Python/bin/python3 的影响,它就会在本地找这个python,然而我们本意是让任务跑到yarn容器里并从HDFS拉python环境下来。
为了能够使用client模式,我们把conda Python软链到当前目录下即可:
1 2 3 4 5 6 7 8 9 10 11 12 |
(tf) [hadoop@10 tf2-onspark]$ ln -s ~/workspace/service/anaconda3/envs/tf ./Python (tf) [hadoop@10 tf2-onspark]$ ll total 60 drwxrwxr-x 2 hadoop hadoop 39 Jan 5 17:25 data -rw-rw-r-- 1 hadoop hadoop 33204 Jan 5 17:25 data.zip lrwxrwxrwx 1 hadoop hadoop 48 Jan 5 17:29 Python -> /home/hadoop/workspace/service/anaconda3/envs/tf -rw-rw-r-- 1 hadoop hadoop 55 Jan 5 16:23 README.md -rw-rw-r-- 1 hadoop hadoop 751 Jan 5 16:23 requirements.txt drwxrwxr-x 2 hadoop hadoop 97 Jan 5 17:25 src -rw-rw-r-- 1 hadoop hadoop 4437 Jan 5 17:25 src.zip -rw-rw-r-- 1 hadoop hadoop 1414 Jan 5 16:23 submit_gen.sh -rw-rw-r-- 1 hadoop hadoop 1500 Jan 5 16:23 submit_train.sh |
然后运行命令:
sh submit_gen.sh
查看HDFS中的tfrecords文件:
train和test两个csv各自被转成了tfrecords格式,并且都切成了5份。
步骤3:分布式训练
现在就需要用到tfos这个python包来实现spark分布式训练了。
观察submit_train.sh
该脚本提交训练任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
#!/bin/bash cd src && zip -r src.zip * && mv src.zip .. && cd - LIB_JVM=/usr/local/jdk/jre/lib/amd64/server/ # --master yarn :运行到yarn集群,固定写法 # --deploy-mode cluster:AM运行到yarn中,如果改成client则需要确保本地目录有./Python/bin/python3 # --num-executors 1:一个executor容器 # --archives hdfs:///Python.zip#Python:从hdfs集群下载/Python.zip到executor工作目录,并解压到Python目录 # --py-files ./src.zip:项目python源代码,会解压到executor的某目录下并令PYTHONPATH指向该目录 # --conf spark.pyspark.python=./Python/bin/python3:指定使用自行上传的Python # --conf spark.executorEnv.LD_LIBRARY_PATH=${LIB_JVM}:依赖libjvm.so # --conf spark.dynamicAllocation.enabled=false 禁止spark自动扩容executor数量 # --conf spark.yarn.maxAppAttempts=1 失败重试1次 spark-submit \ --master yarn \ --deploy-mode client \ --num-executors 5 \ --executor-cores 2 \ --conf spark.task.cpus=2 \ --executor-memory 8G \ --archives hdfs:///tf2-onspark/Python.zip#Python \ --py-files ./src.zip \ --conf spark.pyspark.python=./Python/bin/python3 \ --conf spark.executorEnv.LD_LIBRARY_PATH=${LIB_JVM} \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.yarn.maxAppAttempts=1 \ src/train.py \ --batch_size 32 \ --shuffle_size 32 \ --worker_size 5 \ --epochs 1000 \ --train_dir hdfs:///tf2-onspark/train \ --model_dir hdfs:///tf2-onspark/model \ --tensorboard_dir hdfs:///tf2-onspark/tensorboard |
Python.zip和src.zip仍旧要分发,spark.pyspark.python配置当然也不变。
多了几个–conf非常关键:
- –conf spark.executorEnv.LD_LIBRARY_PATH=${LIB_JVM}:是tensorflowonspark要求指定libjvm.so,根据自己的jvm安装目录做一下指向即可,没有它无法运行成功。
- –conf spark.dynamicAllocation.enabled=false:禁止动态分配executor数量,默认spark有一个行为就是如果集群有闲置资源的话可能会创建比–num-executors更多的容器,这个对我们来说不可接受,因为tensorflow分布式训练的节点数量必须固定(还记得TF_CONFIG吗)。
- –conf spark.yarn.maxAppAttempts=1:限制该任务只能尝试submit一次,应该没啥用。
- –executor-cores 2并且–conf spark.task.cpus=2:tfos要求一个executor中只能跑1个task,我们可以令executor-core为2核同时指定单个task也用2核,这样就可以让1个executor中只跑1个task(spark的executor与task关系参考:https://blog.csdn.net/xuejianbest/article/details/85994504)
后续就是程序主入口src/train.py,以及它需要的各种应用层参数,都是我们自定义的东西,这里一定要注意worker_size和spark的–num-executors要一致,启动几个executor就对应几个tensorflow worker节点,还记得tensorflowonspark原理吗?忘记请回看上面内容。
另外注意:
–model_dir hdfs:///tf2-onspark/model \
–tensorboard_dir hdfs:///tf2-onspark/tensorboard
这2个hdfs输出目录只有cheif节点会写入(tensorflow on spark会指定其中1个executor作为cheif身份),而其他worker节点则只会将model和tensorflow保存到yarn容器本地盘,随着任务结束就释放了,还记得tensorflowonspark原理吗?忘记请回看上面内容。
分析src/train.py
该文件为训练主入口代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import argparse from pyspark.context import SparkContext from pyspark.conf import SparkConf from tensorflowonspark import TFCluster import tensorflow as tf from model import build_model from dataset import dataset_from_tfrecords import time MODEL_VERSION = 0 def main_fun(args, ctx): strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): wide_deep_model = build_model() dataset = dataset_from_tfrecords(args.train_dir + '/part*', include_outputs=True) dataset = dataset.batch(args.batch_size * args.worker_size).shuffle(args.shuffle_size) tensorboard_dir = args.tensorboard_dir if ctx.job_name == 'chief' else './tensorboard' tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='{}/{}'.format(tensorboard_dir, MODEL_VERSION), histogram_freq=1) wide_deep_model.fit(dataset, epochs=args.epochs, callbacks=[tensorboard_callback]) model_dir = args.model_dir if ctx.job_name == 'chief' else './model' wide_deep_model.save('{}/{}'.format(model_dir, MODEL_VERSION), save_format='tf', include_optimizer=False) parser = argparse.ArgumentParser() parser.add_argument("--batch_size", help="number of records per batch", type=int) parser.add_argument("--shuffle_size", help="size of shuffle buffer", type=int) parser.add_argument("--worker_size", help="number of nodes in the cluster", type=int) parser.add_argument("--epochs", help="number of epochs", type=int) parser.add_argument("--train_dir", help="HDFS path to training tfrecords files in parallelized format") parser.add_argument("--model_dir", help="hdfs path to save model") parser.add_argument("--tensorboard_dir", help="hdfs path to tensorboard logs") args = parser.parse_args() print("args:", args) MODEL_VERSION = int(time.time()) conf = SparkConf().setAppName("tf2-onspark-training") sc = SparkContext(conf=conf) cluster = TFCluster.run(sc, main_fun, args, args.worker_size, num_ps=0, tensorboard=False, input_mode=TFCluster.InputMode.TENSORFLOW, master_node='chief') cluster.shutdown() print('model version={}'.format(MODEL_VERSION)) |
和直接使用tensorflow分布式训练框架的区别在于,我们是运行在spark环境下的。
准备好SparkContext后,把它交给tfos的TFCluster.run函数,传入我们的训练函数main_fun,该函数会被pyspark AM序列化后传到所有executor上执行。
args参数其实可以传任意东西,会被回调给main_fun,我们把命令行参数传入使用。
我们需要告诉tfos我们要启动几个worker的训练集群,所以args.worker_size是我们的第4个传参。
num_ps永远传0,因为我们不是采用的PS分布式训练架构,而是tensorflow的同步训练策略MultiWorkerMirroredStrategy(本文开始介绍过它的工作原理)。
tensorboard控制tfos是否给我们开启tensorboard的web界面,我们在集群里训练不需要它帮我们开启,后续我们直接用tensorboard命令行打开hdfs上的hdfs:///tf2-onspark/tensorboard即可显示界面。
input_mode=TFCluster.InputMode.TENSORFLOW是说我们自己给模型喂HDFS上的tfreocrds训练数据,其实tfos支持另外一种直接将RDD喂给tensorflow模型训练的方式,这种方式不太offical,所以我们压根不用去了解。
最后master_node=’cheif’是让tfos给某一个executor下发 TF_CONFIG时候指定其task type叫做cheif,而其余executor则叫做worker,这样我们的main_fun就可以通过第二个参数ctx的ctx.job_name获取到这个名字,决定当前训练节点是将model和tensorboard写入到HDFS还是写入到容器的临时目录。
根据对上述参数的了解,你会发现main_fun里根据是否为cheif身份,对tensorboard和save目录做了区分,cheif写入HDFS而非cheif写入本地被丢弃即可。
tensorflow分布式训练框架用法不变,在分布式策略的scope下进行keras模型的网络结构建设并compile,最后fit并且save模型即可,这里build_model和单机一模一样,大家简单一看即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
def build_model(): linear_features, dnn_features = _build_feature_columns() # wide linear_input_layer = _build_input_layer() linear_feature_layer = tf.keras.layers.DenseFeatures(linear_features) linear_dense_layer1 = tf.keras.layers.Dense(units=1) output = linear_feature_layer(linear_input_layer) output = linear_dense_layer1(output) linear_model = tf.keras.Model(inputs=list(linear_input_layer.values()), outputs=[output]) linear_optimizer = tf.keras.optimizers.Ftrl(l1_regularization_strength=0.5) # deep dnn_feature_layer = tf.keras.layers.DenseFeatures(dnn_features) dnn_norm_layer = tf.keras.layers.BatchNormalization() # important for deep dnn_dense_layer1 = tf.keras.layers.Dense(units=128, activation='relu') dnn_dense_layer2 = tf.keras.layers.Dense(units=1) dnn_input_layer = _build_input_layer() output = dnn_feature_layer(dnn_input_layer) output = dnn_norm_layer(output) # this will break the tensorboard graph because of unfixed bug output = dnn_dense_layer1(output) output = dnn_dense_layer2(output) dnn_model = tf.keras.Model(inputs=list(dnn_input_layer.values()), outputs=[output]) dnn_optimizer = tf.keras.optimizers.Adagrad() # wide&deep wide_deep_model = tf.keras.experimental.WideDeepModel(linear_model, dnn_model, activation='sigmoid') wide_deep_model.compile(optimizer=[linear_optimizer,dnn_optimizer], loss=tf.keras.losses.BinaryCrossentropy(), metrics=tf.keras.metrics.BinaryAccuracy()) return wide_deep_model |
这里需要注意一个关键:
dataset = dataset.batch(args.batch_size * args.worker_size).shuffle(args.shuffle_size)
分布式训练要求:如果我们希望单个worker采用batch_size训练,那么就得让dataset采用batch_size*worker_size的大小,其中worker_size是计算节点个数。
这个牵扯到”数据并行”的底层实现,我们只需要按tensorflow要求做即可。
至于从HDFS上加载若干tfrecords文件得到Dataset对象,这个和单机版没有任何区别,只是文件schema从file://变成了HDFS,因为tensorflow默认支持HDFS访问:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
def dataset_from_tfrecords(tfrecords_pattern, include_outputs=False): all_feature_spec = deepcopy(model_spec['inputs']) if include_outputs: all_feature_spec.update(model_spec['outputs']) feature_dict = {} for feature_name, feature_spec in all_feature_spec.items(): if feature_spec['type'] == 'int': feature = tf.io.FixedLenFeature((), tf.int64) elif feature_spec['type'] == 'float': feature = tf.io.FixedLenFeature((), tf.float32) elif feature_spec['type'] == 'str': feature = tf.io.FixedLenFeature((), tf.string) feature_dict[feature_name] = feature def parse_func(s): inputs = tf.io.parse_single_example(s, feature_dict) outputs = [] if include_outputs: for output_name in model_spec['outputs']: outputs.append(inputs[output_name]) inputs.pop(output_name) if include_outputs: return inputs, outputs return inputs dataset = tf.data.Dataset.list_files(tfrecords_pattern).interleave( lambda filename: tf.data.TFRecordDataset(filename).map(parse_func), num_parallel_calls=tf.data.AUTOTUNE, ) return dataset |
我们传入的实际是一个通配符字符串:hdfs:///tf2-onspark/train/part-*,那么经过tf Dataset的list_files可以从HDFS上枚举出所有训练文件,经过interleave函数可以将每个文件名转换成一个子Dataset,每个子Dataset需要经过parse_func反序列化tfrecord记录为Example对象,并返回样本元祖(x, y),其中x是特征,y是标签。
interleave通过num_parallel_calls传参可以开启多线程数据预加载等特性,可以具体看一下文档。
总之,这样的父子结构dataset交给分布式scope下compile的模型后,在fit训练中会自动在多个worker之间均衡数据(文章开始讲过”数据划分”策略),这样就算有很大的数据集也可以通过增加更多的worker来加快训练速度。
正式执行submit_gen.sh
现在我们开始进行分布式训练:
sh submit_train.sh
然后我遇到了一个错误:
Traceback (most recent call last):
File “/data/workspace/users/liangdong/tf2-onspark/src/train.py”, line 4, in <module>
from tensorflowonspark import TFCluster
File “/data/workspace/users/liangdong/tf2-onspark/Python/lib/python3.8/site-packages/tensorflowonspark/TFCluster.py”, line 35, in <module>
from . import TFSparkNode
File “/data/workspace/users/liangdong/tf2-onspark/Python/lib/python3.8/site-packages/tensorflowonspark/TFSparkNode.py”, line 23, in <module>
from packaging import version
ModuleNotFoundError: No module named ‘packaging’
看样是tensorflowonspark依赖了一个packaing模块,可能它做pip包的时候没有写好依赖,导致我们必须手动安装一下了。
我们必须回头给conda tf环境安装一下这个包:
pip install packaging -i https://pypi.tuna.tsinghua.edu.cn/simple
然后重新把python打包上传一下HDFS,在此不做演示。
启动后可以看出tfos的原理:
1 2 3 4 5 6 7 8 9 |
2021-01-05 18:44:44,232 INFO (MainThread-296579) All TFSparkNodes started 2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 1, 'host': '10.45.6.105', 'job_name': 'worker', 'task_index': 0, 'port': 34737, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-8hh75zfg/listener-8s6tckcf', 'authkey': b'\x0b\x9a`\x9c\xaauL|\xaf\xdf\x99\xa9\xf6\xb62d'} 2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 2, 'host': '10.45.8.219', 'job_name': 'worker', 'task_index': 1, 'port': 37954, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-8u4sx686/listener-iokneywe', 'authkey': b'W\xdb\x9e\xbe\xa4\xf3O\xf1\x98\x91u"\xd2\x80\xb1b'} 2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 0, 'host': '10.45.7.163', 'job_name': 'chief', 'task_index': 0, 'port': 41245, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-rrj7a0ix/listener-zd7o8yhg', 'authkey': b'\x7f\xde\xa3l\x89\x93O\x1b\xa8\x92.z!\xddd\xe6'} 2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 3, 'host': '10.45.7.163', 'job_name': 'worker', 'task_index': 2, 'port': 42065, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-egfzxbbk/listener-ofybltd4', 'authkey': b'}\xf6\xd53r\x19M-\xa5\r\x92B\xc7\xfdi\x06'} 2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 4, 'host': '10.45.8.219', 'job_name': 'worker', 'task_index': 3, 'port': 41258, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-tq93wnd3/listener-si44r__v', 'authkey': b'\xd7^\x82q8KA/\x83\x89\n\xd7g[N\x18'} 2021-01-05 18:44:44,232 INFO (MainThread-296579) Waiting for TensorFlow nodes to complete... 2021-01-05 18:48:44,690 INFO (MainThread-296579) Shutting down cluster model version=1609843466 |
训练完成后,HDFS上的model和tensorboard文件:
1 2 3 4 5 6 7 8 9 |
(tf) [hadoop@10 tf2-onspark]$ hdfs dfs -ls /tf2-onspark/model/1609843466 Found 3 items drwxr-xr-x - hadoop supergroup 0 2021-01-05 18:48 /tf2-onspark/model/1609843466/assets -rw-r--r-- 3 hadoop supergroup 2849331 2021-01-05 18:48 /tf2-onspark/model/1609843466/saved_model.pb drwxr-xr-x - hadoop supergroup 0 2021-01-05 18:48 /tf2-onspark/model/1609843466/variables (tf) [hadoop@10 tf2-onspark]$ hdfs dfs -ls hdfs dfs -ls /tf2-onspark/tensorboard/1609843466 Found 1 items drwxr-xr-x - hadoop supergroup 0 2021-01-05 18:44 /tf2-onspark/tensorboard/1609843466/train |
这些都是cheif身份的节点写入的,cheif是worker中的幸运者,被我们选中采用它的训练结果和训练过程,其实它和普通worker没啥区别,我们选哪个worker作为cheif得到的结果都差不多,实际我们也不需要操心到底cheif是哪个,让tfos随便给我们指定好即可,我们根据ctx.job_name做不同动作即可。
如果训练中想实时观察tensorboard,那么直接命令行启动tensorboard指向hdfs即可:
(tf) [hadoop@10 tf2-onspark]$ tensorboard –logdir=hdfs:///tf2-onspark/model/
最后:分布式predict(inference)
得到模型之后,如果你有需求对大规模数据进行预测,那么把model加载到tf-serving然后通过网络调用来预测不是一个好选择。
更好的方法是直接用tfos启动spark,用多个worker加载model,对HDFS数据集进行sharding后直接全内存运算打分,最后结果写回HDFS即可。
这个过程非常简单,大家自己看看tfos的demo即可:https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/keras/mnist_inference.py
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

1