个人总结能实现多GPU跑图的方法:
1、使用谷歌框架tf.estimator;
session_config = tf.ConfigProto(device_count={'GPU': 0,'GPU':1,'GPU':2,'GPU':3})
run_config = tf.estimator.RunConfig().replace(session_config=session_config)
estimator = tf.estimator.Estimator(
model_fn=model_fn, model_dir=FLAGS.model_dir, config=run_config, params=params)
2、多机多卡分布式多GPU的Distribute 方式,分布式实现模型的并行计算;
Horovod github_homepage
Horovod示例代码
3、自己实现数据并行再多个GPU上计算。(本文介绍的方式)
数据并行就是采用同一个模型,而多个GPU上分别运行不同batch的数据。
类parallel:实现不同GPU块调用预测方法,传入对应的特征batch
import six
import tensorflow as tfdef _maybe_repeat(x, n):
if isinstance(x, list):
assert len(x) == n
return x
else:
return [x] * n# Data-level parallelism
def data_parallelism( devices, fn,*args, **kwargs):
num_worker = len(devices)
devices = ["gpu:%d" % d for d in devices]
# Replicate args and kwargs
if args:
new_args = [_maybe_repeat(arg, num_worker) for arg in args]
# Transpose
new_args = [list(x) for x in zip(*new_args)]
else:
new_args = [[] for _ in range(num_worker)]new_kwargs = [{} for _ in range(num_worker)]for k, v in six.iteritems(kwargs):
vals = _maybe_repeat(v, num_worker)for i in range(num_worker):
new_kwargs[i][k] = vals[i]fns = _maybe_repeat(fn, num_worker)# Now make the parallel call.
outputs = []for i in range(num_worker):
with tf.variable_scope(tf.get_variable_scope(), reuse=(i != 0)):
with tf.name_scope("parallel_%d" % i):
with tf.device(devices[i]):
outputs.append(fns[i](*new_args[i], **new_kwargs[i])) # 分GPU调用方法return outputs
最终跑VGG预测的代码如下:
import numpy as np
import tensorflow as tfimport vgg16
import data_process.util as util
import utils
import time
import pandas as pd
import os
import parallelbatch_size = 12
vgg_output_dim = 4096
map1 = pd.DataFrame(columns=['img_path', 'index'])
device_list = [0,1,2,3] # GPU 0,1,3
device_num = len(device_list)
batch_size = batch_size * device_numdef batch_generator():
"""构造迭代器,不是很重要,根据自己的业务自己构造"""
i = 0
current_batch = 0
file_index = -1
img_path_list = util.list_all_files("../20200409/") # 得到所有图片名称而已,代码不用看
total_count = len(img_path_list)
nb_batches = len(img_path_list) // batch_size
print(nb_batches)
while True:
X = np.zeros((batch_size, 224, 224, 3), dtype=np.float32)
count_tmp = 0
while count_tmp < batch_size:
file_index += 1
if file_index >= total_count:
break
path = img_path_list[file_index]
if path.split('.')[-1] == 'csv':
continue
img = utils.load_image(path) # 根据路劲读图片数据而已,也不用看
file_name = path.split('/')[-1]try:
img.reshape((224, 224, 3))
X[count_tmp] = img
map1.loc[i] = [file_name, i]
i = i + 1
count_tmp += 1except:
continue
if count_tmp > 0:
current_batch = current_batch + 1
yield X[0:count_tmp, :, :, :], current_batchif count_tmp == 0:
breakdef shard_features(feat, placeholders, predictions):
"""输入特征的分片,分割成每个GPU输入的占位符对应每个GPU的数据,需要根据自己的features结构修改"""
num_shards = len(placeholders)
feed_dict = {}
n = 0
batch = feat.shape[0]# 当前批次大小
shard_size = (batch + num_shards - 1) // num_shardsfor i in range(num_shards):
shard_feat = feat[i * shard_size:(i + 1) * shard_size]if shard_feat.shape[0] != 0:
feed_dict[placeholders[i]] = shard_feat
n = i + 1
else:
breakif isinstance(predictions, (list, tuple)):
predictions = predictions[:n]return predictions, feed_dictdef inference_fn(features):
"""预测方法,构造vgg16模型,传入feature,得到fc7值,4096维度的tensor"""
vgg = vgg16.Vgg16()
with tf.name_scope("content_vgg"):
vgg.build(features)
return vgg.fc7with tf.Graph().as_default():
placeholders = []
for i in range(device_num):
placeholders.append(tf.placeholder("float", [None, 224, 224, 3]))predictions = parallel.data_parallelism(device_list, inference_fn, placeholders)with tf.Session(config=tf.ConfigProto(log_device_placement=True)) as sess:
time_start = time.time()
batch = batch_generator()df2 = pd.DataFrame(columns=["vgg{}".format(i) for i in range(vgg_output_dim)])
for l,i in batch:
print("nums0{nums}".format( nums=i))op, feed_dict = shard_features(l, placeholders, predictions)prob = sess.run(op, feed_dict=feed_dict)
# 预测的结果prob是所有GPU的叠加列表
for device in range(device_num):
df1 = pd.DataFrame(prob[device],columns=["vgg{}".format(i) for i in range(vgg_output_dim)])
df2 = pd.concat([df2, df1], axis=0, ignore_index=False)
print(df2.shape)
print(df2.shape)
print(map1)
map1 = map1.reset_index()
df2 = df2.reset_index()
df = pd.concat([map1, df2], axis=1, ignore_index=True)
if not os.path.exists('./results'):
os.mkdir('./results')
df_path = os.path.join('./results', 'map_00.csv')
df.to_csv(df_path, mode='a', header=False)
print("最后结果{result}".format(result=df))
time_end = time.time()
print('time cost', time_end - time_start, 's')
解释都在代码里面,解释都是无力的,自己看代码和注释吧!
【基于数据并行实现多GPU跑模型预测(VGG16示例)】
推荐阅读
- C语言学习|第十一届蓝桥杯省赛 大学B组 C/C++ 第一场
- paddle|动手从头实现LSTM
- pytorch|使用pytorch从头实现多层LSTM
- 推荐系统论文进阶|CTR预估 论文精读(十一)--Deep Interest Evolution Network(DIEN)
- pytorch|YOLOX 阅读笔记
- 前沿论文|论文精读(Neural Architecture Search without Training)
- 联邦学习|【阅读笔记】Towards Efficient and Privacy-preserving Federated Deep Learning
- OpenCV|OpenCV-Python实战(18)——深度学习简介与入门示例
- 深度学习|深度学习笔记总结
- 《繁凡的深度学习笔记》|一文绝对让你完全弄懂信息熵、相对熵、交叉熵的意义《繁凡的深度学习笔记》第 3 章 分类问题与信息论基础(中)(DL笔记整理