记录下最近看莫凡大佬的python教程中,如何手撸RNN分类和回归两种实现方式。
分类实现(具体需要主要的细节已经在代码中备注):
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_datatf.set_random_seed(1)#seed=1表示每次生成的随机数都一样mnist = input_data.read_data_sets('MNIST_data',one_hot = True) #初始化数据#超参数
lr = 0.001
training_iters = 100000
batch_size = 128#网络参数
n_inputs = 28#输入数据像素值
n_steps = 28#步长
n_hidden_units = 128#隐层神经元个数
n_classes = 10x = tf.placeholder(tf.float32,[None,n_steps,n_inputs])
y = tf.placeholder(tf.float32,[None,n_classes])weights = {'in':tf.Variable(tf.random_normal([n_inputs, n_hidden_units])), #输入(28,128)
'out':tf.Variable(tf.random_normal([n_hidden_units,n_classes]))}#输出(128,10)
biases = {'in': tf.Variable(tf.constant(0.1,shape=[n_hidden_units,])),#输入(128,)
'out':tf.Variable(tf.constant(0.1,shape=[n_classes,]))}#输出(10,)def RNN(X,weights,biases):
#输入三维格式是[128batch,28steps,28inputs]
X = tf.reshape(X,[-1,n_inputs])#将原三维数据转换成二维数据
X_in = tf.matmul(X,weights['in']+biases['in'])
X_in = tf.reshape(X_in,[-1,n_steps,n_hidden_units])#二维数据再次转回三维if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0])<1:
#lstm cell的state被分为了两个部分(c_state,m_state),state_is_tuple用来存储这个元祖
cell = tf.nn.rnn_cell.BasicLSTMCell(n_hidden_units,forget_bais=1.0,state_is_tuple=True)
else:
cell = tf.contrib.rnn.BasicLSTMCell(n_hidden_units)
#初始化state
init_state = cell.zero_state(batch_size,dtype=tf.float32)#rnn
outputs,final_state = tf.nn.dynamic_rnn(cell,X_in,initial_state=init_state,time_major = False)#time_major取决于n_steps的位置#输出结果
if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0])<1:
#tf.transpose是将outputs维度按照[1,0,2]的顺序装换为[28steps,128batch,28outputs]
#tf.unpack是将矩阵分解为[(batch,outputs)]*steps
outputs = tf.unpack(tf.transpose(outputs,[1,0,2]))
else:
outputs = tf.unstack(tf.transpose(outputs, [1,0,2]))
results = tf.matmul(outputs[-1],weights['out'])+biases['out']
return resultspred = RNN(x,weights,biases)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=pred,labels = y))
train_op = tf.train.AdamOptimizer(lr).minimize(cost)#tf.equal(A, B)是对比这两个矩阵或者向量的相等的元素,如果是相等的那就返回True,
#反正返回False,返回的值的矩阵维度和A是一样的
correct_pred = tf.equal(tf.argmax(pred,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred,tf.float32))with tf.Session() as sess:
if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0])<1:
init = tf.initialize_all_variables()
else:
init = tf.global_variables_initializer()
sess.run(init)
step = 0while step*batch_size < training_iters:
batch_xs,batch_ys = mnist.train.next_batch(batch_size)
batch_xs = batch_xs.reshape([batch_size,n_steps,n_inputs])
sess.run([train_op],feed_dict={x:batch_xs,y:batch_ys})
if step % 20==0:
print(sess.run(accuracy,feed_dict={x:batch_xs,y:batch_ys}))
step += 1
输出结果:
文章图片
【手撸RNN分类和回归,1024!】
回归实现:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as pltBATCH_START = 0
TIME_STEPS = 20#反向传播时间层数,同样也是正向读取数据时的层数,有时也用n_step表示
BATCH_SIZE = 50#批次数
INPUT_SIZE = 1
OUTPUT_SIZE = 1
CELL_SIZE = 10#一个cell中隐藏层的神经元个数,有时也叫state_size
LR = 0.006def get_batch():
global BATCH_START,TIME_STEP
xs = np.arange(BATCH_START,BATCH_START+TIME_STEPS*BATCH_SIZE).reshape((BATCH_SIZE,TIME_STEPS))/(10*np.pi)
seq = np.sin(xs)
res = np.cos(xs)
BATCH_START += TIME_STEPS
return [seq[:,:,np.newaxis],res[:,:,np.newaxis],xs]#给数据增加一个维度class LSTMRNN(object):
def __init__(self,n_steps,input_size,output_size,cell_size,batch_size):
self.n_steps = n_steps
self.input_size = input_size
self.output_size = output_size
self.cell_size = cell_size
self.batch_size = batch_size#tf.name_scope可以让变量有相同的命名,只是限于tf.Variable的变量
with tf.name_scope('inputs'):
self.xs = tf.placeholder(tf.float32,[None,n_steps,input_size],name='xs')
self.ys = tf.placeholder(tf.float32,[None,n_steps,output_size],name='ys')#tf.variable_scope可以让变量有相同的命名,包括tf.get_variable得到的变量,还有tf.Variable的变量
with tf.variable_scope('in_hidden'):
self.add_input_layer()
with tf.variable_scope('LSTM_cell'):
self.add_cell()
with tf.variable_scope('out_hidden'):
self.add_output_layer()
with tf.variable_scope('cost'):
self.compute_cost()
with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(LR).minimize(self.cost)def add_input_layer(self,):
l_in_x = tf.reshape(self.xs,[-1,self.input_size],name='2_2D') #维度:(batch*n_step,input_size)
Ws_in = self._weight_variable([self.input_size,self.cell_size]) #维度:(input_size,cell_size)
bs_in = self._bias_variable([self.cell_size]) #维度:(cell_size,)
with tf.name_scope('Wx_plus_b'):#维度:(batch*n_step,cell_size)
l_in_y = tf.matmul(l_in_x,Ws_in)+bs_in
self.l_in_y = tf.reshape(l_in_y,[-1,self.n_steps,self.cell_size],name='2_3D')def add_cell(self,):
lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.cell_size,forget_bias=1.0,state_is_tuple=True)
with tf.name_scope('initial_state'):
self.cell_init_state = lstm_cell.zero_state(self.batch_size,dtype=tf.float32)
#输出一个结果和一个终状态
self.cell_outputs,self.cell_final_state = tf.nn.dynamic_rnn(
lstm_cell, self.l_in_y, initial_state=self.cell_init_state, time_major=False)def add_output_layer(self,):
l_out_x = tf.reshape(self.cell_outputs,[-1,self.cell_size],name='2_2D') #维度:(batch*n_step,cell_size)
Ws_out = self._weight_variable([self.cell_size,self.output_size]) #维度:(cell_size,output_size)
bs_out = self._bias_variable([self.output_size,]) #维度:(output_size,)
with tf.name_scope('Wx_plus_b'):
self.pred = tf.matmul(l_out_x,Ws_out)+bs_out #维度:(batch*n_step,output_size)def compute_cost(self,):
#tf.nn.seq2seq.sequence_loss_by_example(logits, targets, weights)计算所有examples的加权交叉熵损失
losses =tf.contrib.legacy_seq2seq.sequence_loss_by_example(
[tf.reshape(self.pred,[-1],name='reshape_pred')],
[tf.reshape(self.ys,[-1],name='reshape_target')],
[tf.ones([self.batch_size*self.n_steps],dtype=tf.float32)],
average_across_timesteps=True,
softmax_loss_function=self.ms_error,
name='losses')
with tf.name_scope('average_cost'):
self.cost = tf.div(
tf.reduce_sum(losses,name='lossses_sum'),
self.batch_size,
name='average_cost')
tf.summary.scalar('cost',self.cost)#返回函数的静态方法,类可以不用实例化就可以调用该方法
@staticmethod
def ms_error(labels,logits):
#tf.subtract(x,y, name=None)
#tf.subtract函数返回一个Tensor,与 x 具有相同的类型。
return tf.square(tf.subtract(labels,logits))def _weight_variable(self,shape,name='weights'):
#tf.random_normal_initializer(mean=0.0, stddev=1.0, seed=None, dtype=tf.float32)
#返回一个生成具有正态分布的张量的初始化器。mean:python标量或标量tensor,产生的随机值的平均值。
#stddev:一个python标量或一个标量tensor,标准偏差的随机值生成。
initializer = tf.random_normal_initializer(mean=0,stddev=1.0,)
return tf.get_variable(shape=shape,initializer=initializer,name=name)def _bias_variable(self,shape,name='biases'):
initializer = tf.constant_initializer(0.1)
return tf.get_variable(name=name,shape=shape,initializer=initializer)if __name__ =='__main__':
model = LSTMRNN(TIME_STEPS,INPUT_SIZE,OUTPUT_SIZE,CELL_SIZE,BATCH_SIZE)
sess =tf.Session()
merged = tf.summary.merge_all()
#将摘要协议缓冲区写入事件文件
writer = tf.summary.FileWriter('logs',sess.graph)
if int((tf.__version__).split('.')[1])<12 and int((tf.__version__).split('.')[0]) <1:
init = tf.initialize_all_variables()
else:
init = tf.global_variables_initializer()
sess.run(init)plt.ion() #打开交互模式
plt.show()
for i in range(1000):
seq,res,xs = get_batch()
if i == 0:
feed_dict={
model.xs:seq,
model.ys:res,
}
else:
feed_dict = {
model.xs:seq,
model.ys:res,
model.cell_init_state:state,#将前一个cell的final_state作为下一个cell的init_state输入
}
_,cost,state,pred = sess.run(
[model.train_op,model.cost,model.cell_final_state,model.pred],
feed_dict=feed_dict)#输出图形
plt.plot(xs[0,:],res[0].flatten(),'r',xs[0,:],pred.flatten()[:TIME_STEPS],'b--')
plt.ylim((-1.2,1.2))
plt.draw()
plt.pause(0.3)
if i% 20==0:
print('cost:',round(cost,4))#round返回浮点数四舍五入值,即cost保留四位小数
result = sess.run(merged, feed_dict)
writer.add_summary(result,i)
输出结果:
文章图片
文章图片
在训练了1000轮后基本吻合了。