基于数据并行实现多GPU跑模型预测(VGG16示例)

个人总结能实现多GPU跑图的方法:
1、使用谷歌框架tf.estimator;

session_config = tf.ConfigProto(device_count={'GPU': 0,'GPU':1,'GPU':2,'GPU':3}) run_config = tf.estimator.RunConfig().replace(session_config=session_config) estimator = tf.estimator.Estimator( model_fn=model_fn, model_dir=FLAGS.model_dir, config=run_config, params=params)

2、多机多卡分布式多GPU的Distribute 方式,分布式实现模型的并行计算;
Horovod github_homepage
Horovod示例代码
3、自己实现数据并行再多个GPU上计算。(本文介绍的方式)
数据并行就是采用同一个模型,而多个GPU上分别运行不同batch的数据。
类parallel:实现不同GPU块调用预测方法,传入对应的特征batch
import six import tensorflow as tfdef _maybe_repeat(x, n): if isinstance(x, list): assert len(x) == n return x else: return [x] * n# Data-level parallelism def data_parallelism( devices, fn,*args, **kwargs): num_worker = len(devices) devices = ["gpu:%d" % d for d in devices] # Replicate args and kwargs if args: new_args = [_maybe_repeat(arg, num_worker) for arg in args] # Transpose new_args = [list(x) for x in zip(*new_args)] else: new_args = [[] for _ in range(num_worker)]new_kwargs = [{} for _ in range(num_worker)]for k, v in six.iteritems(kwargs): vals = _maybe_repeat(v, num_worker)for i in range(num_worker): new_kwargs[i][k] = vals[i]fns = _maybe_repeat(fn, num_worker)# Now make the parallel call. outputs = []for i in range(num_worker): with tf.variable_scope(tf.get_variable_scope(), reuse=(i != 0)): with tf.name_scope("parallel_%d" % i): with tf.device(devices[i]): outputs.append(fns[i](*new_args[i], **new_kwargs[i])) # 分GPU调用方法return outputs


最终跑VGG预测的代码如下:
import numpy as np import tensorflow as tfimport vgg16 import data_process.util as util import utils import time import pandas as pd import os import parallelbatch_size = 12 vgg_output_dim = 4096 map1 = pd.DataFrame(columns=['img_path', 'index']) device_list = [0,1,2,3] # GPU 0,1,3 device_num = len(device_list) batch_size = batch_size * device_numdef batch_generator(): """构造迭代器,不是很重要,根据自己的业务自己构造""" i = 0 current_batch = 0 file_index = -1 img_path_list = util.list_all_files("../20200409/") # 得到所有图片名称而已,代码不用看 total_count = len(img_path_list) nb_batches = len(img_path_list) // batch_size print(nb_batches) while True: X = np.zeros((batch_size, 224, 224, 3), dtype=np.float32) count_tmp = 0 while count_tmp < batch_size: file_index += 1 if file_index >= total_count: break path = img_path_list[file_index] if path.split('.')[-1] == 'csv': continue img = utils.load_image(path) # 根据路劲读图片数据而已,也不用看 file_name = path.split('/')[-1]try: img.reshape((224, 224, 3)) X[count_tmp] = img map1.loc[i] = [file_name, i] i = i + 1 count_tmp += 1except: continue if count_tmp > 0: current_batch = current_batch + 1 yield X[0:count_tmp, :, :, :], current_batchif count_tmp == 0: breakdef shard_features(feat, placeholders, predictions): """输入特征的分片,分割成每个GPU输入的占位符对应每个GPU的数据,需要根据自己的features结构修改""" num_shards = len(placeholders) feed_dict = {} n = 0 batch = feat.shape[0]# 当前批次大小 shard_size = (batch + num_shards - 1) // num_shardsfor i in range(num_shards): shard_feat = feat[i * shard_size:(i + 1) * shard_size]if shard_feat.shape[0] != 0: feed_dict[placeholders[i]] = shard_feat n = i + 1 else: breakif isinstance(predictions, (list, tuple)): predictions = predictions[:n]return predictions, feed_dictdef inference_fn(features): """预测方法,构造vgg16模型,传入feature,得到fc7值,4096维度的tensor""" vgg = vgg16.Vgg16() with tf.name_scope("content_vgg"): vgg.build(features) return vgg.fc7with tf.Graph().as_default(): placeholders = [] for i in range(device_num): placeholders.append(tf.placeholder("float", [None, 224, 224, 3]))predictions = parallel.data_parallelism(device_list, inference_fn, placeholders)with tf.Session(config=tf.ConfigProto(log_device_placement=True)) as sess: time_start = time.time() batch = batch_generator()df2 = pd.DataFrame(columns=["vgg{}".format(i) for i in range(vgg_output_dim)]) for l,i in batch: print("nums0{nums}".format( nums=i))op, feed_dict = shard_features(l, placeholders, predictions)prob = sess.run(op, feed_dict=feed_dict) # 预测的结果prob是所有GPU的叠加列表 for device in range(device_num): df1 = pd.DataFrame(prob[device],columns=["vgg{}".format(i) for i in range(vgg_output_dim)]) df2 = pd.concat([df2, df1], axis=0, ignore_index=False) print(df2.shape) print(df2.shape) print(map1) map1 = map1.reset_index() df2 = df2.reset_index() df = pd.concat([map1, df2], axis=1, ignore_index=True) if not os.path.exists('./results'): os.mkdir('./results') df_path = os.path.join('./results', 'map_00.csv') df.to_csv(df_path, mode='a', header=False) print("最后结果{result}".format(result=df)) time_end = time.time() print('time cost', time_end - time_start, 's')

解释都在代码里面,解释都是无力的,自己看代码和注释吧!
【基于数据并行实现多GPU跑模型预测(VGG16示例)】

    推荐阅读