深度学习|【延伸阅读】让老照片重现光彩(五)(Pix2PixHD模型源代码+中文注释)

英伟达公司和加州大学伯克利分校于2018年发表的“基于有条件GAN的高分辨率图像合成及语义操控”项目,是本项目“让老照片重现光彩”的技术基础,算是一个前置开源项目。
“基于有条件GAN的高分辨率图像合成及语义操控”项目的技术核心是Pix2PixHD模型,我们在这里分享一下相关的源代码+中文注释,基于此可以加深对“让老照片重现光彩”项目的理解(尤其是,在老照片项目的模型与训练源代码尚未开源的情况下)。
“基于有条件GAN的高分辨率图像合成及语义操控”项目在GitHub上的链接是:https://github.com/NVIDIA/pix2pixHD
Pix2PixHD模型使用PyTorch构建,代码清晰、整齐,相关的源代码主要是3个文件,分别是:./models/models.py、 ./models/pix2pixHD_model.py 和./models/networks.py
说明如下:
(1)./models/models.py
调用 Pix2PixHDModel() 创建模型。

import torch# 创建模型,并返回模型 def create_model(opt): if opt.model == 'pix2pixHD':# 选择pix2pixHD model from .pix2pixHD_model import Pix2PixHDModel, InferenceModel if opt.isTrain:# 若是训练,则为True model = Pix2PixHDModel() else:# 否则,若仅仅是前向传播用来演示,则为False model = InferenceModel() else:# 选择 UIModel model from .ui_model import UIModel model = UIModel() model.initialize(opt)# 模型初始化参数 if opt.verbose:# 默认为false,表示之前并无模型保存 print("model [%s] was created" % (model.name()))# 打印label2city模型被创建if opt.isTrain and len(opt.gpu_ids) and not opt.fp16: model = torch.nn.DataParallel(model, device_ids=opt.gpu_ids)# 多GPU训练return model

(2)./models/pix2pixHD_model.py
构建模型的核心内容:
定义有条件GAN(Pix2PixHDModel)的生成器、鉴别器、编码器(用于生成实例的低维特征);
定义损失函数(包括:GANloss,VGGloss、特征匹配损失函数);
定义生成器和鉴别器的优化器(optimizer);
定义各模块的输入;
定义forward函数。
import numpy as np import torch import os from torch.autograd import Variable from util.image_pool import ImagePool from .base_model import BaseModel from . import networksclass Pix2PixHDModel(BaseModel): def name(self): return 'Pix2PixHDModel'# loss滤波器:其中g_gan、d_real、d_fake三个loss值是肯定返回的 # 这里的g_gan_feat即论文中的“特征匹配损失函数”(论文中的等式(4)) # g_vgg为论文中的VGG感知损失函数,稍微改善了输出结果 # g_gan_feat、g_vgg两个loss值根据train_options的opt.no_ganFeat_loss, not opt.no_vgg_loss而定(默认是需要返回的) def init_loss_filter(self, use_gan_feat_loss, use_vgg_loss): flags = (True, use_gan_feat_loss, use_vgg_loss, True, True) def loss_filter(g_gan, g_gan_feat, g_vgg, d_real, d_fake): return [l for (l,f) in zip((g_gan,g_gan_feat,g_vgg,d_real,d_fake),flags) if f] return loss_filterdef initialize(self, opt): BaseModel.initialize(self, opt) if opt.resize_or_crop != 'none' or not opt.isTrain: # when training at full res this causes OOM torch.backends.cudnn.benchmark = True self.isTrain = opt.isTrain self.use_features = opt.instance_feat or opt.label_feat self.gen_features = self.use_features and not self.opt.load_features input_nc = opt.label_nc if opt.label_nc != 0 else opt.input_nc##### define networks # Generator network # 生成器网络 netG_input_nc = input_nc if not opt.no_instance: netG_input_nc += 1# 添加instance通道(区分不同实例) if self.use_features: netG_input_nc += opt.feat_num# 添加feature_map通道(使用encoder) self.netG = networks.define_G(netG_input_nc, opt.output_nc, opt.ngf, opt.netG, opt.n_downsample_global, opt.n_blocks_global, opt.n_local_enhancers, opt.n_blocks_local, opt.norm, gpu_ids=self.gpu_ids)# Discriminator network # 鉴别器网络 if self.isTrain: use_sigmoid = opt.no_lsgan netD_input_nc = input_nc + opt.output_nc# real_images + fake_images if not opt.no_instance: netD_input_nc += 1# 添加instance通道(区分不同实例) self.netD = networks.define_D(netD_input_nc, opt.ndf, opt.n_layers_D, opt.norm, use_sigmoid, opt.num_D, not opt.no_ganFeat_loss, gpu_ids=self.gpu_ids)### Encoder network # 编码器网络(是define_G()中的一个子函数) if self.gen_features: self.netE = networks.define_G(opt.output_nc, opt.feat_num, opt.nef, 'encoder', opt.n_downsample_E, norm=opt.norm, gpu_ids=self.gpu_ids) if self.opt.verbose: print('---------- Networks initialized -------------')# load networks # 加载网络(模型) if not self.isTrain or opt.continue_train or opt.load_pretrain: pretrained_path = '' if not self.isTrain else opt.load_pretrain self.load_network(self.netG, 'G', opt.which_epoch, pretrained_path) if self.isTrain: self.load_network(self.netD, 'D', opt.which_epoch, pretrained_path) if self.gen_features: self.load_network(self.netE, 'E', opt.which_epoch, pretrained_path)# set loss functions and optimizers if self.isTrain: if opt.pool_size > 0 and (len(self.gpu_ids)) > 1: raise NotImplementedError("Fake Pool Not Implemented for MultiGPU") self.fake_pool = ImagePool(opt.pool_size)# 初始化fake_pool:num_imgs = 0,images = [] self.old_lr = opt.lr# define loss functions # 定义损失函数,在.forward()中使用 # 默认使用ganfeat_loss和vgg_loss self.loss_filter = self.init_loss_filter(not opt.no_ganFeat_loss, not opt.no_vgg_loss)self.criterionGAN = networks.GANLoss(use_lsgan=not opt.no_lsgan, tensor=self.Tensor) self.criterionFeat = torch.nn.L1Loss() if not opt.no_vgg_loss: self.criterionVGG = networks.VGGLoss(self.gpu_ids)# Names so we can breakout loss # 给损失函数命名 self.loss_names = self.loss_filter('G_GAN','G_GAN_Feat','G_VGG','D_real', 'D_fake')# initialize optimizers # 初始化优化器 # optimizer G(含:encoder) if opt.niter_fix_global > 0: import sys if sys.version_info >= (3,0): finetune_list = set() else: from sets import Set finetune_list = Set()params_dict = dict(self.netG.named_parameters()) params = [] for key, value in params_dict.items(): if key.startswith('model' + str(opt.n_local_enhancers)): params += [value] finetune_list.add(key.split('.')[0]) print('------------- Only training the local enhancer network (for %d epochs) ------------' % opt.niter_fix_global) print('The layers that are finetuned are ', sorted(finetune_list)) else: params = list(self.netG.parameters()) if self.gen_features: params += list(self.netE.parameters()) self.optimizer_G = torch.optim.Adam(params, lr=opt.lr, betas=(opt.beta1, 0.999))# optimizer D params = list(self.netD.parameters()) self.optimizer_D = torch.optim.Adam(params, lr=opt.lr, betas=(opt.beta1, 0.999))# feat=feature(特征),inst=instance(实例) # label_map(标签图)每个像素值代表像素的对象类,inst_map(实例图)每个像素包含每个单独对象的唯一对象ID # 获取实例图的边界(边缘),将edge_map与label_map的one-hot向量拼接在一起,封装为Variable,赋值给input_label # real_image和feat_map,封装为Variable,赋值给real_image和feat_map;label_map赋值给inst_map def encode_input(self, label_map, inst_map=None, real_image=None, feat_map=None, infer=False): # label_map 数据类型转化 if self.opt.label_nc == 0: input_label = label_map.data.cuda() else: # create one-hot vector for label map size = label_map.size() oneHot_size = (size[0], self.opt.label_nc, size[2], size[3]) input_label = torch.cuda.FloatTensor(torch.Size(oneHot_size)).zero_() input_label = input_label.scatter_(1, label_map.data.long().cuda(), 1.0)# 将列表转成one-hot编码的形式 if self.opt.data_type == 16: input_label = input_label.half()# get edges from instance map # 获取实例图的边界(边缘),将edge_map与input_label拼接在一起 if not self.opt.no_instance: inst_map = inst_map.data.cuda() edge_map = self.get_edges(inst_map) input_label = torch.cat((input_label, edge_map), dim=1) input_label = Variable(input_label, volatile=infer)# real images for training if real_image is not None: real_image = Variable(real_image.data.cuda())# instance map for feature encoding if self.use_features: # get precomputed feature maps if self.opt.load_features: feat_map = Variable(feat_map.data.cuda()) if self.opt.label_feat: inst_map = label_map.cuda()return input_label, inst_map, real_image, feat_map# 定义判别器 def discriminate(self, input_label, test_image, use_pool=False): input_concat = torch.cat((input_label, test_image.detach()), dim=1) if use_pool: fake_query = self.fake_pool.query(input_concat)# 读取fake images return self.netD.forward(fake_query) else: return self.netD.forward(input_concat)# 前向传播,使用输入数据运行模型 # PyTorch 允许在前向传播过程中进行动态操作(如:跳跃连接等) def forward(self, label, inst, image, feat, infer=False): # Encode Inputs # 获取实例图的边界(边缘),将edge_map与label_map的one-hot向量拼接在一起,封装为Variable,赋值给input_label input_label, inst_map, real_image, feat_map = self.encode_input(label, inst, image, feat)# Fake Generation # 调用生成器生成fake images if self.use_features: # 调用netE(即:encoder)对输入图片进行encoder-decoder运算,提取feature_map if not self.opt.load_features: feat_map = self.netE.forward(real_image, inst_map) input_concat = torch.cat((input_label, feat_map), dim=1)# 将input_label与特征图拼接在一起,作为生成器netG的输入 else: input_concat = input_label fake_image = self.netG.forward(input_concat)# Fake Detection and Loss # 输入为input_label和fake_image,鉴别器生成fake images pool(假图片池)的预测(prediction)、D_fake损失函数 pred_fake_pool = self.discriminate(input_label, fake_image, use_pool=True) loss_D_fake = self.criterionGAN(pred_fake_pool, False)# Real Detection and Loss # 输入为input_label和real_image,鉴别器生成real images的预测(prediction)、D_real损失函数 pred_real = self.discriminate(input_label, real_image) loss_D_real = self.criterionGAN(pred_real, True)# GAN loss (Fake Passability Loss) # 将输入标签与假图片拼接后作为输入,鉴别器生成假图片预测(prediction)、G_GAN损失函数 pred_fake = self.netD.forward(torch.cat((input_label, fake_image), dim=1)) loss_G_GAN = self.criterionGAN(pred_fake, True)# GAN feature matching loss # 计算GAN的特征匹配损失函数,每一个尺度的鉴别器(num_D)、鉴别器的每层特征提取器(pred_fake)分别加权计算并求和 loss_G_GAN_Feat = 0 if not self.opt.no_ganFeat_loss: feat_weights = 4.0 / (self.opt.n_layers_D + 1)# 4.0/(鉴别器的层数+1) D_weights = 1.0 / self.opt.num_D# 1.0/(多尺度的个数,论文中是3) for i in range(self.opt.num_D): for j in range(len(pred_fake[i])-1): # 计算:L1Loss(),lambda_feat为(输入的)调节系数 loss_G_GAN_Feat += D_weights * feat_weights * \ self.criterionFeat(pred_fake[i][j], pred_real[i][j].detach()) * self.opt.lambda_feat# VGG feature matching loss # VGG特征匹配损失函数 loss_G_VGG = 0 if not self.opt.no_vgg_loss: # 计算fake_image和real_image之间的VGGLoss,lambda_feat为输入的调节系数 # real_image不进行梯度计算 loss_G_VGG = self.criterionVGG(fake_image, real_image) * self.opt.lambda_feat# Only return the fake_B image if necessary to save BW return [ self.loss_filter( loss_G_GAN, loss_G_GAN_Feat, loss_G_VGG, loss_D_real, loss_D_fake ), None if not infer else fake_image ]# 推理 # 将标签、实例边界、特征图作为输入,生成假图片 def inference(self, label, inst, image=None): # Encode Inputs image = Variable(image) if image is not None else None # 将实例边界与label的one-hot向量拼接在一起,返回给input_label input_label, inst_map, real_image, _ = self.encode_input(Variable(label), Variable(inst), image, infer=True)# Fake Generation if self.use_features: if self.opt.use_encoded_image: # encode the real image to get feature map # 用encoder计算真实图像的特征图 feat_map = self.netE.forward(real_image, inst_map) else: # sample clusters from precomputed features # 随机选取实例图中的某个特征作为编码特征,用于训练 feat_map = self.sample_features(inst_map) input_concat = torch.cat((input_label, feat_map), dim=1)# 把feat_map和input_label拼接在一起,作为生成器的输入 else: input_concat = input_labelif torch.__version__.startswith('0.4'): with torch.no_grad(): fake_image = self.netG.forward(input_concat)# 调用generator生成假图片 else: fake_image = self.netG.forward(input_concat) return fake_imagedef sample_features(self, inst): # read precomputed feature clusters cluster_path = os.path.join(self.opt.checkpoints_dir, self.opt.name, self.opt.cluster_path) features_clustered = np.load(cluster_path, encoding='latin1').item()# randomly sample from the feature clusters # 从特征簇中随机采样 inst_np = inst.cpu().numpy().astype(int) feat_map = self.Tensor(inst.size()[0], self.opt.feat_num, inst.size()[2], inst.size()[3])# feat_map.size for i in np.unique(inst_np):# 对于一维数组或者列表,unique()去除其中重复的元素,并按元素由大到小返回一个新的无元素重复的元组或者列表 # 确定具有唯一性的特征代码,并将特征代码排序 label = i if i < 1000 else i//1000 if label in features_clustered: feat = features_clustered[label]# 从特征簇中取出当前特征代码对应的特征向量 cluster_idx = np.random.randint(0, feat.shape[0])# 任取一个随机数,用于抽取feat[]的某一行数据idx = (inst == int(i)).nonzero()# nonzero()返回非零的位置,即特征图中与排序后的特征代码一致的所有非零位置 for k in range(self.opt.feat_num):# feat_num,特征的个数 # feat_map[channel, feature_num, hight, width] # 任意抽取feat[]中某一行中的数据,赋值给feat_map feat_map[idx[:,0], idx[:,1] + k, idx[:,2], idx[:,3]] = feat[cluster_idx, k] if self.opt.data_type==16: feat_map = feat_map.half() return feat_mapdef encode_features(self, image, inst): image = Variable(image.cuda(), volatile=True) feat_num = self.opt.feat_num h, w = inst.size()[2], inst.size()[3] block_num = 32 feat_map = self.netE.forward(image, inst.cuda()) inst_np = inst.cpu().numpy().astype(int) feature = {} for i in range(self.opt.label_nc): feature[i] = np.zeros((0, feat_num+1)) for i in np.unique(inst_np): label = i if i < 1000 else i//1000 idx = (inst == int(i)).nonzero() num = idx.size()[0] idx = idx[num//2,:] val = np.zeros((1, feat_num+1)) for k in range(feat_num): val[0, k] = feat_map[idx[0], idx[1] + k, idx[2], idx[3]].data[0] val[0, feat_num] = float(num) / (h * w // block_num) feature[label] = np.append(feature[label], val, axis=0) return feature# 获得instance的边界(边缘),t是inst_map # 如果实例边界图中的一个像素的对象ID与它的4个邻居中的任何一个不同,那么该像素为1,否则为0 def get_edges(self, t): edge = torch.cuda.ByteTensor(t.size()).zero_()# 初始化为0 edge[:,:,:,1:] = edge[:,:,:,1:] | (t[:,:,:,1:] != t[:,:,:,:-1]) edge[:,:,:,:-1] = edge[:,:,:,:-1] | (t[:,:,:,1:] != t[:,:,:,:-1]) edge[:,:,1:,:] = edge[:,:,1:,:] | (t[:,:,1:,:] != t[:,:,:-1,:]) edge[:,:,:-1,:] = edge[:,:,:-1,:] | (t[:,:,1:,:] != t[:,:,:-1,:]) if self.opt.data_type==16: return edge.half() else: return edge.float()# 保存模型参数 def save(self, which_epoch): self.save_network(self.netG, 'G', which_epoch, self.gpu_ids) self.save_network(self.netD, 'D', which_epoch, self.gpu_ids) if self.gen_features: self.save_network(self.netE, 'E', which_epoch, self.gpu_ids)def update_fixed_params(self): # after fixing the global generator for a number of iterations, also start finetuning it params = list(self.netG.parameters()) if self.gen_features: params += list(self.netE.parameters()) self.optimizer_G = torch.optim.Adam(params, lr=self.opt.lr, betas=(self.opt.beta1, 0.999)) if self.opt.verbose: print('------------ Now also finetuning global generator -----------')# 更新学习率 def update_learning_rate(self): lrd = self.opt.lr / self.opt.niter_decay lr = self.old_lr - lrd for param_group in self.optimizer_D.param_groups: param_group['lr'] = lr for param_group in self.optimizer_G.param_groups: param_group['lr'] = lr if self.opt.verbose: print('update learning rate: %f -> %f' % (self.old_lr, lr)) self.old_lr = lr# 推理模型,前向传播 class InferenceModel(Pix2PixHDModel): def forward(self, inp): label, inst = inp return self.inference(label, inst)

(3) ./models/networks.py
定义底层的神经网络模块:
定义生成器define_G(),以及生成器中的核心模块:全局生成器GlobalGenerator()、局部增强器LocalEnhancer()、残差块ResnetBlock()、编码器Encoder();
定义鉴别器define_D(),以及鉴别器的核心模块:多尺度鉴别器MultiscaleDiscriminator()、PactchGAN N层鉴别器NLayerDiscriminator();
定义损失函数GANLoss()、VGGLoss();
定义网络模型Vgg19()。
import torch import torch.nn as nn import functools from torch.autograd import Variable import numpy as np############################################################################### # Functions ############################################################################### def weights_init(m): classname = m.__class__.__name__ if classname.find('Conv') != -1: m.weight.data.normal_(0.0, 0.02) elif classname.find('BatchNorm2d') != -1: m.weight.data.normal_(1.0, 0.02) m.bias.data.fill_(0)# 数据的归一化处理 def get_norm_layer(norm_type='instance'): if norm_type == 'batch': norm_layer = functools.partial(nn.BatchNorm2d, affine=True# 对NHW做归一化 elif norm_type == 'instance': norm_layer = functools.partial(nn.InstanceNorm2d, affine=False)# 对HW做归一化,用在风格化迁移 else: raise NotImplementedError('normalization layer [%s] is not found' % norm_type) return norm_layer# 在Pix2PixHD中,G分为两部分,一部分是global net,另一部分是local net,即:define_G()前两个if语句对应的分支 # 第三个if语句对应的是论文中E的部分,用来预先计算类别特征,区分相同语义标签(semantic label)的多个实例 # input_nc = 3,number of input channels(不含instance和feature map通道) # output_nc = 3,number of output channels(不含instance和feature map通道) # ngf = 64 第一层卷积核数 def define_G(input_nc, output_nc, ngf, netG, n_downsample_global=3, n_blocks_global=9, n_local_enhancers=1, n_blocks_local=3, norm='instance', gpu_ids=[]): norm_layer = get_norm_layer(norm_type=norm) if netG == 'global': netG = GlobalGenerator(input_nc, output_nc, ngf, n_downsample_global, n_blocks_global, norm_layer) elif netG == 'local': netG = LocalEnhancer(input_nc, output_nc, ngf, n_downsample_global, n_blocks_global, n_local_enhancers, n_blocks_local, norm_layer) elif netG == 'encoder': netG = Encoder(input_nc, output_nc, ngf, n_downsample_global, norm_layer) else: raise('generator not implemented!') print(netG) if len(gpu_ids) > 0: assert(torch.cuda.is_available()) netG.cuda(gpu_ids[0]) netG.apply(weights_init) return netG# 按照论文的说法,Pix2PixHD的D有多个(3个) # input_nc = 3+3 (real_images+fake_images,不含instance通道) def define_D(input_nc, ndf, n_layers_D, norm='instance', use_sigmoid=False, num_D=1, getIntermFeat=False, gpu_ids=[]): norm_layer = get_norm_layer(norm_type=norm) netD = MultiscaleDiscriminator(input_nc, ndf, n_layers_D, norm_layer, use_sigmoid, num_D, getIntermFeat) print(netD) if len(gpu_ids) > 0: assert(torch.cuda.is_available()) netD.cuda(gpu_ids[0]) netD.apply(weights_init) return netDdef print_network(net): if isinstance(net, list): net = net[0] num_params = 0 for param in net.parameters(): num_params += param.numel() print(net) print('Total number of parameters: %d' % num_params)############################################################################## # Losses ############################################################################## class GANLoss(nn.Module): def __init__(self, use_lsgan=True, target_real_label=1.0, target_fake_label=0.0, tensor=torch.FloatTensor): super(GANLoss, self).__init__() self.real_label = target_real_label self.fake_label = target_fake_label self.real_label_var = None self.fake_label_var = None self.Tensor = tensor # lsgan: Least Squares GAN, 最小二乘GAN if use_lsgan: self.loss = nn.MSELoss()#均方差 MSE(Mean Square Error) else: self.loss = nn.BCELoss()# 二元交叉熵 BCE(Binary Cross Entropy),xlog(p(x)) + (1-x)log(1-p(x))# Pytorch中基本的变量类型是FloatTensor # Variable是FloatTensor的封装,除了包含FloatTensor还包含有梯度信息 def get_target_tensor(self, input, target_is_real): target_tensor = None if target_is_real: create_label = ((self.real_label_var is None) or (self.real_label_var.numel() != input.numel())) if create_label: real_tensor = self.Tensor(input.size()).fill_(self.real_label) self.real_label_var = Variable(real_tensor, requires_grad=False) target_tensor = self.real_label_var else: create_label = ((self.fake_label_var is None) or (self.fake_label_var.numel() != input.numel())) if create_label: fake_tensor = self.Tensor(input.size()).fill_(self.fake_label) self.fake_label_var = Variable(fake_tensor, requires_grad=False) target_tensor = self.fake_label_var return target_tensordef __call__(self, input, target_is_real): if isinstance(input[0], list): loss = 0 for input_i in input: pred = input_i[-1] target_tensor = self.get_target_tensor(pred, target_is_real) loss += self.loss(pred, target_tensor) return loss else: target_tensor = self.get_target_tensor(input[-1], target_is_real) return self.loss(input[-1], target_tensor)# VGG19输出的特征图的5个切片的L1Loss(),权重分别为[1/32, 1/16, 1/8, 1/4, 1],加权求和 class VGGLoss(nn.Module): def __init__(self, gpu_ids): super(VGGLoss, self).__init__() self.vgg = Vgg19().cuda() self.criterion = nn.L1Loss()# L1Loss, 平均绝对误差(Mean Absolute Error,MAE) self.weights = [1.0/32, 1.0/16, 1.0/8, 1.0/4, 1.0]# 计算 x 和 y 的 L1Loss def forward(self, x, y): x_vgg, y_vgg = self.vgg(x), self.vgg(y) loss = 0 for i in range(len(x_vgg)): # .detach()返回一个新的从当前图中分离的 Variable,返回的 Variable 永远不会需要梯度 # 可以用于以该变量为输入部分网络求梯度,而不影响y_vgg[]本身 loss += self.weights[i] * self.criterion(x_vgg[i], y_vgg[i].detach()) return loss############################################################################## # Generator ############################################################################## # 局部增强器(论文中的G2) class LocalEnhancer(nn.Module): def __init__(self, input_nc, output_nc, ngf=32, n_downsample_global=3, n_blocks_global=9, n_local_enhancers=1, n_blocks_local=3, norm_layer=nn.BatchNorm2d, padding_type='reflect'): super(LocalEnhancer, self).__init__() self.n_local_enhancers = n_local_enhancers###### global generator model ##### # G1 model ngf_global = ngf * (2**n_local_enhancers)# =64 model_global = GlobalGenerator(input_nc, output_nc, ngf_global, n_downsample_global, n_blocks_global, norm_layer).model model_global = [model_global[i] for i in range(len(model_global)-3)] # get rid of final convolution layers # 最后一层的输出[64,512,512] self.model = nn.Sequential(*model_global)###### local enhancer layers ##### for n in range(1, n_local_enhancers+1): # =2 ### downsample ngf_global = ngf * (2**(n_local_enhancers-n)) model_downsample = [nn.ReflectionPad2d(3), nn.Conv2d(input_nc, ngf_global, kernel_size=7, padding=0), norm_layer(ngf_global), nn.ReLU(True), nn.Conv2d(ngf_global, ngf_global * 2, kernel_size=3, stride=2, padding=1), norm_layer(ngf_global * 2), nn.ReLU(True)] ### residual blocks # model_upsample在此处定义,在 .forward 里使用 model_upsample = [] for i in range(n_blocks_local):# =3 model_upsample += [ResnetBlock(ngf_global * 2, padding_type=padding_type, norm_layer=norm_layer)]### upsample model_upsample += [nn.ConvTranspose2d(ngf_global * 2, ngf_global, kernel_size=3, stride=2, padding=1, output_padding=1), norm_layer(ngf_global), nn.ReLU(True)]### final convolution if n == n_local_enhancers: model_upsample += [nn.ReflectionPad2d(3), nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0), nn.Tanh()]# 为中间层命名 setattr(self, 'model'+str(n)+'_1', nn.Sequential(*model_downsample)) setattr(self, 'model'+str(n)+'_2', nn.Sequential(*model_upsample))# 平均池化,输出 y = (x+2*1-3)/2 + 1,下采样 self.downsample = nn.AvgPool2d(3, stride=2, padding=[1, 1], count_include_pad=False)def forward(self, input): ### create input pyramid # (缺省)构建二组不同的输入 # 通过平均池化,第二组输入尺寸降低1/2 input_downsampled = [input] for i in range(self.n_local_enhancers):# =1 input_downsampled.append(self.downsample(input_downsampled[-1])) # [-1]取最后一个元素### output at coarest level # 论文中G1输出的特征图 output_prev = self.model(input_downsampled[-1]) ### build up one layer at a time # coarse to fine,G1输出的特征图与G2(F)输出的特征图求和,作为model_upsample()的输入 # G2(F)缺省为只有一层,即:n_local_enhancers=1 for n_local_enhancers in range(1, self.n_local_enhancers+1):# =2 # 取出各中间层 model_downsample = getattr(self, 'model'+str(n_local_enhancers)+'_1') model_upsample = getattr(self, 'model'+str(n_local_enhancers)+'_2') # 确定输入 input_i = input_downsampled[self.n_local_enhancers-n_local_enhancers]# 1-1 = 0 # 生成输出 output_prev = model_upsample(model_downsample(input_i) + output_prev) return output_prev# 全局生成器(论文中的G1) class GlobalGenerator(nn.Module): def __init__(self, input_nc, output_nc, ngf=64, n_downsampling=3, n_blocks=9, norm_layer=nn.BatchNorm2d, padding_type='reflect'): assert(n_blocks >= 0) super(GlobalGenerator, self).__init__() activation = nn.ReLU(True)# 第一层,用的是zero_padding # 因为第一层用的是7x7的卷积核、padding=0,而 512%7 = 1,因此需要补充6个像素,镜像填充ReflectionPad2d(3) # [3,512,512]->[64,512,512],ngf=64 model = [nn.ReflectionPad2d(3), nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0), norm_layer(ngf), activation]# 下采样,每一层卷积的stride都是2,n_downsampling=3 ### downsample,stride=2 # [64,512,512]->[512,64,64] for i in range(n_downsampling): mult = 2**i model += [nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, stride=2, padding=1), norm_layer(ngf * mult * 2), activation]# 残差块,残差块不改变分辨率 ### resnet blocks # dim=512 mult = 2**n_downsampling for i in range(n_blocks): model += [ResnetBlock(ngf * mult, padding_type=padding_type, activation=activation, norm_layer=norm_layer)]# 和下采样数目一样的上采样部分,上采样部分不像Unet结构,没有用到下采样得到的特征图 ### upsample,使用转置卷积函数ConvTranspoese2d(),stride=2 for i in range(n_downsampling): mult = 2**(n_downsampling - i) model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2), kernel_size=3, stride=2, padding=1, output_padding=1), norm_layer(int(ngf * mult / 2)), activation]# 模型的输出层。这里没有使用归一化 # [64,512,512]->[3,512,512] model += [nn.ReflectionPad2d(3), nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0), nn.Tanh()] self.model = nn.Sequential(*model)def forward(self, input): return self.model(input)# Define a resnet block # 定义残差块 class ResnetBlock(nn.Module): def __init__(self, dim, padding_type, norm_layer, activation=nn.ReLU(True), use_dropout=False): super(ResnetBlock, self).__init__() self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, activation, use_dropout)def build_conv_block(self, dim, padding_type, norm_layer, activation, use_dropout): conv_block = [] p = 0 if padding_type == 'reflect': conv_block += [nn.ReflectionPad2d(1)] elif padding_type == 'replicate': conv_block += [nn.ReplicationPad2d(1)] elif padding_type == 'zero': p = 1 else: raise NotImplementedError('padding [%s] is not implemented' % padding_type)conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p), norm_layer(dim), activation] if use_dropout: conv_block += [nn.Dropout(0.5)]p = 0 if padding_type == 'reflect': conv_block += [nn.ReflectionPad2d(1)] elif padding_type == 'replicate': conv_block += [nn.ReplicationPad2d(1)] elif padding_type == 'zero': p = 1 else: raise NotImplementedError('padding [%s] is not implemented' % padding_type) conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p), norm_layer(dim)]return nn.Sequential(*conv_block)def forward(self, x): out = x + self.conv_block(x) return out# 编码器网络E,生成低维特征,作为生成器网络的输入 # 这是一个标准的编解码器网络,添加了一个实例级平均池层,以计算对象实例的平均特性(找到每一类对象的多个实例) class Encoder(nn.Module): def __init__(self, input_nc, output_nc, ngf=32, n_downsampling=4, norm_layer=nn.BatchNorm2d): super(Encoder, self).__init__() self.output_nc = output_ncmodel = [nn.ReflectionPad2d(3), nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0), norm_layer(ngf), nn.ReLU(True)] ### downsample,stride=2 for i in range(n_downsampling): mult = 2**i model += [nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, stride=2, padding=1), norm_layer(ngf * mult * 2), nn.ReLU(True)]### upsample,使用转置卷积函数ConvTranspose2d(),stride=2 for i in range(n_downsampling): mult = 2**(n_downsampling - i) model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2), kernel_size=3, stride=2, padding=1, output_padding=1), norm_layer(int(ngf * mult / 2)), nn.ReLU(True)]model += [nn.ReflectionPad2d(3), nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0), nn.Tanh()] self.model = nn.Sequential(*model) def forward(self, input, inst): outputs = self.model(input)# instance-wise average pooling outputs_mean = outputs.clone() inst_list = np.unique(inst.cpu().numpy().astype(int))# instance list for i in inst_list: for b in range(input.size()[0]):# 对HW做平均池化 indices = (inst[b:b+1] == int(i)).nonzero() # n x 4,nonzero()返回的是数组中非零元素的位置 for j in range(self.output_nc): # 每个feature map单独计算 output_ins = outputs[indices[:,0] + b, indices[:,1] + j, indices[:,2], indices[:,3]] mean_feat = torch.mean(output_ins).expand_as(output_ins) outputs_mean[indices[:,0] + b, indices[:,1] + j, indices[:,2], indices[:,3]] = mean_feat return outputs_mean#多尺度判别器,基于鉴别器的特征匹配损失函数,用来改善GAN损失函数(提高稳定型和优化效率) class MultiscaleDiscriminator(nn.Module): def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d, use_sigmoid=False, num_D=3, getIntermFeat=False): super(MultiscaleDiscriminator, self).__init__() self.num_D = num_D self.n_layers = n_layers self.getIntermFeat = getIntermFeat# 生成的NLayerDiscriminator类,被设置(恰当地说,是“命名”)为当前类(self)的一个属性 # 生成num_D个NLayerDiscriminator for i in range(num_D): netD = NLayerDiscriminator(input_nc, ndf, n_layers, norm_layer, use_sigmoid, getIntermFeat) if getIntermFeat: for j in range(n_layers+2): # setattr() 函数对应函数 getattr(),用于设置属性值 setattr(self, 'scale'+str(i)+'_layer'+str(j), getattr(netD, 'model'+str(j))) else: setattr(self, 'layer'+str(i), netD.model)self.downsample = nn.AvgPool2d(3, stride=2, padding=[1, 1], count_include_pad=False)# 平均池化,下采样def singleD_forward(self, model, input): if self.getIntermFeat: result = [input] for i in range(len(model)): result.append(model[i](result[-1])) return result[1:] else: return [model(input)]# D的前向传播 def forward(self, input): num_D = self.num_D result = [] input_downsampled = input # 逐一下采样,生成多个不同尺度的输入,并经singleD_forward()生成不同尺度的输出 for i in range(num_D): if self.getIntermFeat: model = [getattr(self, 'scale'+str(num_D-1-i)+'_layer'+str(j)) for j in range(self.n_layers+2)] else: model = getattr(self, 'layer'+str(num_D-1-i)) result.append(self.singleD_forward(model, input_downsampled)) if i != (num_D-1): input_downsampled = self.downsample(input_downsampled) return result# 用指定的参数定义PatchGAN鉴别器(只定义网络,loss函数在class Pix2PixHDModel()中定义) # Defines the PatchGAN discriminator with the specified arguments. class NLayerDiscriminator(nn.Module): def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d, use_sigmoid=False, getIntermFeat=False): super(NLayerDiscriminator, self).__init__() self.getIntermFeat = getIntermFeat self.n_layers = n_layerskw = 4 padw = int(np.ceil((kw-1.0)/2))# =2,np.ceil()计算大于等于该值的最小整数 # [3, 512, 512] -> [64, 257, 257] sequence = [[nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)]]nf = ndf for n in range(1, n_layers): nf_prev = nf nf = min(nf * 2, 512) sequence += [[ nn.Conv2d(nf_prev, nf, kernel_size=kw, stride=2, padding=padw), norm_layer(nf), nn.LeakyReLU(0.2, True) ]]nf_prev = nf nf = min(nf * 2, 512) sequence += [[ nn.Conv2d(nf_prev, nf, kernel_size=kw, stride=1, padding=padw), norm_layer(nf), nn.LeakyReLU(0.2, True) ]]sequence += [[nn.Conv2d(nf, 1, kernel_size=kw, stride=1, padding=padw)]]if use_sigmoid: sequence += [[nn.Sigmoid()]]# 命名,以方便取出每一个中间层(计算feature mapping loss会用到) if getIntermFeat: for n in range(len(sequence)): setattr(self, 'model'+str(n), nn.Sequential(*sequence[n])) else: sequence_stream = [] for n in range(len(sequence)): sequence_stream += sequence[n] self.model = nn.Sequential(*sequence_stream)def forward(self, input): if self.getIntermFeat: res = [input] for n in range(self.n_layers+2): model = getattr(self, 'model'+str(n)) res.append(model(res[-1])) return res[1:] else: return self.model(input)from torchvision import models # VGG19,定义模型的5个切片(只用到0--29层) class Vgg19(torch.nn.Module): def __init__(self, requires_grad=False): super(Vgg19, self).__init__() vgg_pretrained_features = models.vgg19(pretrained=True).features self.slice1 = torch.nn.Sequential() self.slice2 = torch.nn.Sequential() self.slice3 = torch.nn.Sequential() self.slice4 = torch.nn.Sequential() self.slice5 = torch.nn.Sequential() for x in range(2): self.slice1.add_module(str(x), vgg_pretrained_features[x]) for x in range(2, 7): self.slice2.add_module(str(x), vgg_pretrained_features[x]) for x in range(7, 12): self.slice3.add_module(str(x), vgg_pretrained_features[x]) for x in range(12, 21): self.slice4.add_module(str(x), vgg_pretrained_features[x]) for x in range(21, 30): self.slice5.add_module(str(x), vgg_pretrained_features[x]) if not requires_grad: for param in self.parameters(): param.requires_grad = Falsedef forward(self, X): h_relu1 = self.slice1(X) h_relu2 = self.slice2(h_relu1) h_relu3 = self.slice3(h_relu2) h_relu4 = self.slice4(h_relu3) h_relu5 = self.slice5(h_relu4) out = [h_relu1, h_relu2, h_relu3, h_relu4, h_relu5] return out

(完)

【深度学习|【延伸阅读】让老照片重现光彩(五)(Pix2PixHD模型源代码+中文注释)】

    推荐阅读