《莎士比亚》文本生成

这篇文章我是基于一个博主文章改造的,但是这个博主有些方面没有说的很清楚,也可能是我比较愚昧的,所以关键的地方我都加了注释,让大家都知道,要想训练一个rnn,到底怎么组织输入,怎么利用输出(当然了,这是基于tensorflow,不同的框架可能有不同)。### 原博主文章在这
代码如下,如果有人看到觉得有用,并且给我点赞评论,那我将会好好修改修改,毕竟这位是动力嘛,
为了让大家清楚,附上一张图,一定要搞清楚LSTM的c和a(有些图叫h,没有区别,名字而已),这两个状态,切记切记

《莎士比亚》文本生成
文章图片
【《莎士比亚》文本生成】
3. 我的代码[^code]

#写在最前面的话。LSTM(只要有两个状态都差不多)的输入一般是[batch_size, steps, one_hot编码或者embeeding], #dynamic_rnn的outputs是[batch_size, steps, hidden_units],state(只包含最后两个状态,c,h,他们的shape一样)shape是[batch_size,hidden_units] # 一般此时的W权重为[hidden_units, classes(需要分的类别数)], #如果需要每一个输出,那么就有outputs #最后重要的一点,因为我们是一个batch一个batch的输入,所以每次output都是针对batch的#还有一个很简单的一点,因为x和h都住一个公式中运算然后相加,那么他们与各自的W相乘,shape肯定是一样的, #现在单个x是[150, 83],那么Wx肯定是[83, 512],结果为[150,512],所以c(h也一样, c是由x和a得到的)也是[150, 512],那么推出 #Wa是[512, 512]import time from collections import namedtuple import numpy as np import tensorflow as tfwith open('data/shakespeare.txt', 'r') as f: text = f.read() vocab = set(text) # vocab_to_int = {c: i for i, c in enumerate(vocab)} # int_to_vocab = dict(enumerate(vocab)) # int_to_vocab = {i: c for i, c in enumerate(vocab)}f = open('data/vocab_to_int.txt','r') a = f.read() vocab_to_int = eval(a)f = open('data/int_to_vocab.txt','r') a = f.read() int_to_vocab = eval(a)#对文本进行转码,就是将每个字符都转换为对应的数字 encoded = np.array([vocab_to_int[c] for c in text], dtype=np.int32) #转码前 print(text[: 100]) #转码后 print(encoded[: 100])#得到一个batch的数据 def get_batches(arr, n_seqs, n_steps): ''' #对已有的数组进行mini-batch分割 arr: 待分割的数组 n_seqs: 一个batch中序列的个数 n_steps: 单个序列包含的字符数 ''' batch_size = n_steps * n_steps n_batches = int(len(arr) / batch_size) #这里我们仅保留完整的batch,对于多余的部分舍弃 arr = arr[: batch_size * n_batches] arr = arr.reshape((n_seqs, -1))for n in range(0, arr.shape[1], n_steps): #inputs x = arr[:, n: n+n_steps] #targets y = np.zeros_like(x) y[:, : -1], y[:, -1] = x[:, 1:], x[:, 0] yield x, ybatches = get_batches(encoded, 10, 50) x, y = next(batches)# print('x\n', x) # print('\ny\n', y)#构建输入层 def build_inputs(num_seqs, num_steps): ''' 构建输入层 num_seqs: 每个batch中的序列个数 num_steps: 每个序列包含的字符数 ''' inputs = tf.placeholder(tf.int32, shape=(num_seqs, num_steps), name='inputs') targets = tf.placeholder(tf.int32, shape=(num_seqs, num_steps), name='targets')#加入keep_prob,就是dropout选择随机失活节点比例 keep_prob = tf.placeholder(tf.float32, name='keep_prob')return inputs, targets, keep_prob#构建LSTM层 def build_lstm(lstm_size, num_layers, batch_size, keep_prob): ''' 构建LSTM层 lstm_size: lstm中隐层节点数量 num_layers: lstm的隐层数目,这是堆叠lstm,有好几层 batch_size: keep_prob: ''' lstm_cells = [] for i in range(num_layers): #构建一个基本的lstm单元 lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size) #添加dropout drop = tf.nn.rnn_cell.DropoutWrapper(lstm, output_keep_prob=keep_prob) lstm_cells.append(drop) #堆叠 cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cells) # state_size是我们在定义MultiRNNCell的时就设置好了的, # 只是我们的输入input shape=[batch_size, num_steps], # 我们刚刚定义好的cell会依次接收num_steps个输入然后产生最后的state(n-tuple,n表示堆叠的层数) # 但是一个batch内有batch_size这样的seq,因此就需要[batch_size,s]来存储整个batch每个seq的状态。 initial_state = cell.zero_state(batch_size, tf.float32)return cell, initial_state#构造输出层 def build_output(lstm_output, in_size, out_size): ''' 构造输出层 lstm_output: lstm层的输出结果 in_size: lstm输出层重塑后的size out_size: sotfmax层的四则 ''' #将lstm的输出按照列concate,例如[[1, 2, 3], [7, 8, 9]] #tf.concat的结果是[1, 2, 3, 7, 8, 9] print('我是lstm_output:', lstm_output.shape) #(150, 100, 512) seq_output = tf.concat(lstm_output, axis=1) #reshape x = tf.reshape(seq_output, [-1, in_size]) print('我是reshape之后的output:', x) #(15000, 512)#及那个lstm层与sotfmax层全连接 with tf.variable_scope('sotfmax'): sotfmax_w = tf.Variable(tf.truncated_normal([in_size, out_size], stddev=0.1)) sotfmax_b = tf.Variable(tf.zeros(out_size)) #计算logits logits = tf.matmul(x, sotfmax_w) + sotfmax_b#softmax层返回概率分布 out = tf.nn.softmax(logits=logits, name='predictions')return out, logits#根据logits和targets计算损失 def build_loss(logits, targets, lstm_size, num_classes): ''' 根据logits和targets计算损失 logits: 全连接层的输出结果(不经过softmax) targets: lstm_size: num_classes: ''' #one-hot编码 y_one_hot = tf.one_hot(targets, num_classes) y_reshaped = tf.reshape(y_one_hot, logits.get_shape())#softmax cross entropy loss loss = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=y_reshaped) loss = tf.reduce_mean(loss)return loss#构造optimizer def build_optimizer(loss, learning_rate, grad_clip): ''' 构造optimizer loss: 损失 learning_rate: 学习率 ''' #使用clipping gradients tvars = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(loss, tvars), grad_clip) train_op = tf.train.AdamOptimizer(learning_rate) optimizer = train_op.apply_gradients(zip(grads, tvars))return optimizer#模型组合 class CharRNN: def __init__(self, num_classes, batch_size=64, num_steps=50, lstm_size=128, num_layers=2, learning_rate=0.001, grad_clip=5, sampling=False): #如果sampling是True,则采用SGD if sampling == True: batch_size, num_steps = 1, 1 else: batch_size, num_steps = batch_size, num_stepstf.reset_default_graph()#输入层 self.inputs, self.targets, self.keep_prob = build_inputs(batch_size, num_steps)#LSTM层 cell, self.initial_state = build_lstm(lstm_size, num_layers, batch_size, self.keep_prob)#对输入进行one-hot编码 x_one_hot = tf.one_hot(self.inputs, num_classes) print('我是x_one_hot:', x_one_hot.shape) #(150, 100, 83)#运行RNN #outputs输出的是最上面一层的输出(考虑了堆叠lstm的情况),states保存的是最后一个时间输出的states #inputs是一整个tensor,num_steps是inputs的一个维度 #tf.nn.dynamic_rnn的x_in必须是3维的输入 outputs, state = tf.nn.dynamic_rnn(cell, x_one_hot, initial_state=self.initial_state) print('我是dynamic_rnn之后的outputs:', outputs.shape) #(150, 100, 512) self.final_state = state print('我是state:', state) #元祖,包含c和h,第一个是c,第二个是和,他们shape相同,(150, 512)#预测结果 self.prediction, self.logits = build_output(outputs, lstm_size, num_classes) print('我是prediction,logits:',self.prediction, self.logits) #(15000, 83)#Loss和optimizer(with gradient clipping) self.loss = build_loss(self.logits, self.targets, lstm_size, num_classes) self.optimizer = build_optimizer(self.loss, learning_rate, grad_clip)#模型训练 # num_seqs: 单个batch中序列的个数 # num_steps: 单个序列中字符数目 # lstm_size: 隐层结点个数 # num_layers: LSTM层个数 # learning_rate: 学习率 # keep_prob: dropout层中保留结点比例 batch_size = 150 num_steps = 100 lstm_size = 512 num_layers = 2 learning_rate = 0.001 keep_prob = 0.5epochs = 20 #每n轮进行一次变量保存 save_every_n = 200model = CharRNN(len(vocab), batch_size=batch_size, num_steps=num_steps, lstm_size=lstm_size, num_layers=num_layers, learning_rate=learning_rate) saver = tf.train.Saver(max_to_keep=10) with tf.Session() as sess: sess.run(tf.global_variables_initializer())counter = 0 for e in range(epochs): #Train network new_state = sess.run(model.initial_state) loss = 0 for x, y in get_batches(encoded, batch_size, num_steps): counter += 1 start = time.time() feed = { model.inputs : x, model.targets : y, model.keep_prob : keep_prob, model.initial_state : new_state } batch_loss, new_state, _ = sess.run([model.loss, model.final_state, model.optimizer], feed_dict=feed) end = time.time() #control the print lines if counter % 100 == 0: print('轮数:{}/{}...'.format(e+1, epochs), '训练步数:{}...'.format(counter), '训练误差:{:.4f}...'.format(batch_loss), '{:.4f} sec/batch'.format((end-start))) if counter % save_every_n == 0: saver.save(sess, 'checkpoints/i{}_1{}.ckpt'.format(counter, lstm_size))saver.save(sess, 'checkpoints/i{}_1{}.ckpt'.format(counter, lstm_size))#查看checkpoints print(tf.train.get_checkpoint_state('checkpoints'))#从预测结果中选取前top_n个最有可能的字符 def pick_top_n(preds, vocab_size, top_n = 5): ''' 从预测结果中选取钱top_n个最有可能的字符 preds: 预测结果 vocab_size: top_n: ''' p = np.squeeze(preds) #将除了top_n个预测值的位置都置为0 p[np.argsort(p)[: -top_n]] = 0#np.argsort()返回的是数组从小到大的索引值 #归一化概率 p = p / np.sum(p) #随机选取一个字符 c = np.random.choice(vocab_size, 1, p=p)[0] return c#生成新文本 def sample(checkpoint, n_samples, lstm_size, vocab_size, prime='The '): ''' 生成新文本 checkpoint: 某一轮迭代的参数文件 n_samples: 新文本的字符长度 lstm_size: 隐层节点数 vocab_size: prime: 起始文本 ''' #将输入的单词转换为单个字符组成的list samples = [c for c in prime] #sampling=True意味着batch的size=1 x 1 model = CharRNN(len(vocab), lstm_size=lstm_size, sampling=True) saver = tf.train.Saver() with tf.Session() as sess: #加载模型参数,恢复训练 saver.restore(sess, checkpoint) new_state = sess.run(model.initial_state) for c in prime: x = np.zeros((1, 1)) #输入单个字符 x[0, 0] = vocab_to_int[c] feed = { model.inputs: x, model.keep_prob: 1., model.initial_state: new_state } preds, new_state = sess.run([ model.prediction, model.final_state ], feed_dict=feed)c = pick_top_n(preds, len(vocab)) #添加字符到samples中 samples.append(int_to_vocab[c])#不断生成字符,直到达到指定数目 for i in range(n_samples): x[0, 0] = c feed = { model.inputs : x, model.keep_prob : 1., model.initial_state : new_state } preds, new_state = sess.run([model.prediction, model.final_state], feed_dict=feed) c = pick_top_n(preds, len(vocab)) samples.append(int_to_vocab[c])return ''.join(samples)print(tf.train.latest_checkpoint('checkpoints'))#选用最终的训练参数作为输入进行文本生成 checkpoints = tf.train.latest_checkpoint('checkpoints') samp = sample(checkpoints, 500, lstm_size, len(vocab), prime='Shu Xu ') print(samp) # # checkpoint = 'checkpoints\\i3960_1512.ckpt' # samp = sample(checkpoint, 1000, lstm_size, len(vocab), prime="Far") # print(samp) # # checkpoint = 'checkpoints\\i3960_1512.ckpt' # samp = sample(checkpoint, 1000, lstm_size, len(vocab), prime="Shu ") # print(samp)

    推荐阅读