关于使用LSTM模型实现客流预测的自我总结
鉴于我写下这篇博客只是为了自己以后回头查错记录之用,所涉及的代码或是技术均可能出现较大误差,若有人想要了解或学习相关内容,请不要过多参考代码,除此之外,如有热心大佬积极留言,小白在此叩头感谢!
长短期记忆网络(LSTM,Long Short-Term Memory)是一种时间循环神经网络,是为了解决一般的RNN(循环神经网络)存在的长期依赖问题而专门设计出来的,所有的RNN都具有一种重复神经网络模块的链式形式。
各类神经网络之间比较
自互联网问世以来,人们对于机器大脑与生物大脑的探究就一直没有停止过,随之面世的各种针对于处理各种不同问题的神经网络也如雨后春笋般层出不穷,小白我也初涉不深,耳熟能详的GAN、RNN、CNN等神经网络当然也各有千秋,在此便不一一赘述,
[详情请参考此大佬
](https://blog.csdn.net/gaussrieman123/article/details/79129299%29)
为什么使用LSTM神经网络模型处理客流预测
大家都有过去乘坐地铁出勤的经历吧,早晚高峰的场面估计也是见怪不怪吧!出现这种情况的主要原因便是客流中心无法根据实时的客流量情况来有效的调动地铁运行状态。大致就会出现这样一种奇怪的现状——即是有的候车位人满为患,有的却是“门可罗雀”情形。
客流预测模型所需要的神经网络与其他神经网络有一个需求极大的地方——时间片功能与基于上个时间记忆功能从而做出预测的动作。客流预测对于时间的间隔要求比起一般的图像或者是文字识别更为苛刻。人流量的变动与车辆之间的调动必须要紧密联系在一起。并且其还有一个特点是——所要求的时间间隔必须足够小!我们大家都知道,如同成都、上海之流的大城市,人口流量以及人流单位时间内的变化是一个可怕的概念,更不必说早晚高峰这种极端情况,这就对于我们的神经网络有着时间间隔的要求!
LSTM特点
说了这么多,我们还是来谈谈LSTM到底有哪些优点吧!首先来说,LSTM模型在关于时间片的间隔这一方面来说,做客流预测就非其不可!同时,其对于向前数据的更好的分析而实现对下一时刻数据的预测这块儿也有着得天独厚的优势!着重体现的便是LSTM的特殊结构——遗忘门、输入门、输出门。其通过一系列的实现最终可以完成时间循环学习并达到预测的效果!
详见[借用一下大佬见解](https://blog.csdn.net/zhangbaoanhadoop/article/details/81952284%29)
最后谈谈
LSTM神经网络是在RNN神经网络的基础上优化而来,解决了RNN模型有关的梯度消失而难以处理较长数据列的问题,现如今不论是科学研究或是工业现代化上均已广泛引用。所以其学习必要十分明显!
小白我当前也只是做到粗略理解其概念,盼今后再接再厉!下面贴上自己的一段代码(错的啦):
import randomimport numpy as np
import mathdef sigmoid(x):
return 1. / (1 + np.exp(-x))def sigmoid_derivative(values):
return values*(1-values)def tanh_derivative(values):
return 1. - values ** 2#createst uniform random array w/ values in [a,b) and shape args
def rand_arr(a, b, *args):
np.random.seed(0)
return np.random.rand(*args) * (b - a) + aclass LstmParam:
def __init__(self, mem_cell_ct, x_dim):
self.mem_cell_ct = mem_cell_ct
self.x_dim = x_dim
concat_len = x_dim + mem_cell_ct
# weight matrices
self.wg = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
self.wi = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
self.wf = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
self.wo = rand_arr(-0.1, 0.1, mem_cell_ct, concat_len)
# bias terms
self.bg = rand_arr(-0.1, 0.1, mem_cell_ct)
self.bi = rand_arr(-0.1, 0.1, mem_cell_ct)
self.bf = rand_arr(-0.1, 0.1, mem_cell_ct)
self.bo = rand_arr(-0.1, 0.1, mem_cell_ct)
# diffs (derivative of loss function w.r.t. all parameters)
self.wg_diff = np.zeros((mem_cell_ct, concat_len))
self.wi_diff = np.zeros((mem_cell_ct, concat_len))
self.wf_diff = np.zeros((mem_cell_ct, concat_len))
self.wo_diff = np.zeros((mem_cell_ct, concat_len))
self.bg_diff = np.zeros(mem_cell_ct)
self.bi_diff = np.zeros(mem_cell_ct)
self.bf_diff = np.zeros(mem_cell_ct)
self.bo_diff = np.zeros(mem_cell_ct) def apply_diff(self, lr = 1):
self.wg -= lr * self.wg_diff
self.wi -= lr * self.wi_diff
self.wf -= lr * self.wf_diff
self.wo -= lr * self.wo_diff
self.bg -= lr * self.bg_diff
self.bi -= lr * self.bi_diff
self.bf -= lr * self.bf_diff
self.bo -= lr * self.bo_diff
# reset diffs to zero
self.wg_diff = np.zeros_like(self.wg)
self.wi_diff = np.zeros_like(self.wi)
self.wf_diff = np.zeros_like(self.wf)
self.wo_diff = np.zeros_like(self.wo)
self.bg_diff = np.zeros_like(self.bg)
self.bi_diff = np.zeros_like(self.bi)
self.bf_diff = np.zeros_like(self.bf)
self.bo_diff = np.zeros_like(self.bo) class LstmState:
def __init__(self, mem_cell_ct, x_dim):
self.g = np.zeros(mem_cell_ct)
self.i = np.zeros(mem_cell_ct)
self.f = np.zeros(mem_cell_ct)
self.o = np.zeros(mem_cell_ct)
self.s = np.zeros(mem_cell_ct)
self.h = np.zeros(mem_cell_ct)
self.bottom_diff_h = np.zeros_like(self.h)
self.bottom_diff_s = np.zeros_like(self.s)class LstmNode:
def __init__(self, lstm_param, lstm_state):
# store reference to parameters and to activations
self.state = lstm_state
self.param = lstm_param
# non-recurrent input concatenated with recurrent input
self.xc = Nonedef bottom_data_is(self, x, s_prev = None, h_prev = None):
# if this is the first lstm node in the network
if s_prev is None: s_prev = np.zeros_like(self.state.s)
if h_prev is None: h_prev = np.zeros_like(self.state.h)
# save data for use in backprop
self.s_prev = s_prev
self.h_prev = h_prev# concatenate x(t) and h(t-1)
xc = np.hstack((x,h_prev))
self.state.g = np.tanh(np.dot(self.param.wg, xc) + self.param.bg)
self.state.i = sigmoid(np.dot(self.param.wi, xc) + self.param.bi)
self.state.f = sigmoid(np.dot(self.param.wf, xc) + self.param.bf)
self.state.o = sigmoid(np.dot(self.param.wo, xc) + self.param.bo)
self.state.s = self.state.g * self.state.i + s_prev * self.state.f
self.state.h = self.state.s * self.state.oself.xc = xcdef top_diff_is(self, top_diff_h, top_diff_s):
# notice that top_diff_s is carried along the constant error carousel
ds = self.state.o * top_diff_h + top_diff_s
do = self.state.s * top_diff_h
di = self.state.g * ds
dg = self.state.i * ds
df = self.s_prev * ds# diffs w.r.t. vector inside sigma / tanh function
di_input = sigmoid_derivative(self.state.i) * di
df_input = sigmoid_derivative(self.state.f) * df
do_input = sigmoid_derivative(self.state.o) * do
dg_input = tanh_derivative(self.state.g) * dg# diffs w.r.t. inputs
self.param.wi_diff += np.outer(di_input, self.xc)
self.param.wf_diff += np.outer(df_input, self.xc)
self.param.wo_diff += np.outer(do_input, self.xc)
self.param.wg_diff += np.outer(dg_input, self.xc)
self.param.bi_diff += di_input
self.param.bf_diff += df_input
self.param.bo_diff += do_input
self.param.bg_diff += dg_input# compute bottom diff
dxc = np.zeros_like(self.xc)
dxc += np.dot(self.param.wi.T, di_input)
dxc += np.dot(self.param.wf.T, df_input)
dxc += np.dot(self.param.wo.T, do_input)
dxc += np.dot(self.param.wg.T, dg_input)# save bottom diffs
self.state.bottom_diff_s = ds * self.state.f
self.state.bottom_diff_h = dxc[self.param.x_dim:]class LstmNetwork():
def __init__(self, lstm_param):
self.lstm_param = lstm_param
self.lstm_node_list = []
# input sequence
self.x_list = []def y_list_is(self, y_list, loss_layer):
"""
Updates diffs by setting target sequence
with corresponding loss layer.
Will *NOT* update parameters.To update parameters,
call self.lstm_param.apply_diff()
"""
assert len(y_list) == len(self.x_list)
idx = len(self.x_list) - 1
# first node only gets diffs from label ...
loss = loss_layer.loss(self.lstm_node_list[idx].state.h, y_list[idx])
diff_h = loss_layer.bottom_diff(self.lstm_node_list[idx].state.h, y_list[idx])
# here s is not affecting loss due to h(t+1), hence we set equal to zero
diff_s = np.zeros(self.lstm_param.mem_cell_ct)
self.lstm_node_list[idx].top_diff_is(diff_h, diff_s)
idx -= 1### ... following nodes also get diffs from next nodes, hence we add diffs to diff_h
### we also propagate error along constant error carousel using diff_s
while idx >= 0:
loss += loss_layer.loss(self.lstm_node_list[idx].state.h, y_list[idx])
diff_h = loss_layer.bottom_diff(self.lstm_node_list[idx].state.h, y_list[idx])
diff_h += self.lstm_node_list[idx + 1].state.bottom_diff_h
diff_s = self.lstm_node_list[idx + 1].state.bottom_diff_s
self.lstm_node_list[idx].top_diff_is(diff_h, diff_s)
idx -= 1 return lossdef x_list_clear(self):
self.x_list = []def x_list_add(self, x):
self.x_list.append(x)
if len(self.x_list) > len(self.lstm_node_list):
# need to add new lstm node, create new state mem
lstm_state = LstmState(self.lstm_param.mem_cell_ct, self.lstm_param.x_dim)
self.lstm_node_list.append(LstmNode(self.lstm_param, lstm_state))# get index of most recent x input
idx = len(self.x_list) - 1
if idx == 0:
# no recurrent inputs yet
self.lstm_node_list[idx].bottom_data_is(x)
else:
s_prev = self.lstm_node_list[idx - 1].state.s
h_prev = self.lstm_node_list[idx - 1].state.h
self.lstm_node_list[idx].bottom_data_is(x, s_prev, h_prev)
下面是一个利用LSTM实现小目标的例子(瑕疵存在)
"""
LSTM时间序列问题预测:国际旅行人数预测
"""
import numpy as np
from matplotlib import pyplot as plt
from pandas import read_csv
import math
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_errorseed = 7
batch_size = 1
epochs = 100
filename = 'D:/1/11/lstm/lstm-master/jinchu.csv'
footer = 3
look_back = 1def create_dataset(dataset):
# 创建数据集
dataX, dataY = [], []
for i in range ( len ( dataset ) - look_back - 1 ):
x = dataset[i:i + look_back, 0]
dataX.append ( x )
y = dataset[i + look_back, 0]
dataY.append ( y )
print ( 'X: %s, Y: %s' % (x, y) )
return np.array ( dataX ), np.array ( dataY )def build_model():
model = Sequential ()
model.add ( LSTM ( units=4, input_shape=(1, look_back) ) )
model.add ( Dense ( units=1 ) )
model.compile ( loss='mean_squared_error', optimizer='adam' )
return modelif __name__ == '__main__':
# 设置随机种子
np.random.seed ( seed )# 导入数据
data = https://www.it610.com/article/read_csv ( filename, usecols=[1], engine='python', skipfooter=footer )
dataset = data.values.astype ( 'float32' )
# 标准化数据
scaler = MinMaxScaler ()
dataset = scaler.fit_transform ( dataset )
train_size = int ( len ( dataset ) * 0.67 )
validation_size = len ( dataset ) - train_size
train, validation = dataset[0:train_size, :], dataset[train_size:len ( dataset ), :]# 创建dataset,使数据产生相关性
X_train, y_train = create_dataset ( train )
X_validation, y_validation = create_dataset ( validation )
# 将数据转换成[样本,时间步长,特征]的形式
X_train = np.reshape ( X_train, (X_train.shape[0], 1, X_train.shape[1]) )
X_validation = np.reshape ( X_validation, (X_validation.shape[0], 1, X_validation.shape[1]) )# 训练模型
model = build_model ()
model.fit ( X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=2 )# 模型预测数据
predict_train = model.predict ( X_train )
predict_validation = model.predict ( X_validation )# 反标准化数据,目的是为了保证MSE的准确性
predict_train = scaler.inverse_transform ( predict_train )
y_train = scaler.inverse_transform ( [y_train] )
predict_validation = scaler.inverse_transform ( predict_validation )
y_validation = scaler.inverse_transform ( [y_validation] )# 评估模型
train_score = math.sqrt ( mean_squared_error ( y_train[0], predict_train[:, 0] ) )
print ( 'Train Score: %.2f RMSE' % train_score )
validation_score = math.sqrt ( mean_squared_error ( y_validation[0], predict_validation[:, 0] ) )
print ( 'Validation Score : %.2f RMSE' % validation_score )# 构建通过训练数据集进行预测的图表数据
predict_train_plot = np.empty_like ( dataset )
predict_train_plot[:, :] = np.nan
predict_train_plot[look_back:len ( predict_train ) + look_back, :] = predict_train# 构建通过评估数据集进行预测的图表数据
predict_validation_plot = np.empty_like ( dataset )
predict_validation_plot[:, :] = np.nan
predict_validation_plot[len ( predict_train ) + look_back * 2 + 1: len ( dataset ) - 1, :] = predict_validation# 图表显示
dataset = scaler.inverse_transform ( dataset )
plt.plot ( dataset, color='black' )
plt.plot ( predict_train_plot, color='green' )
plt.plot ( predict_validation_plot, color='red' )
plt.show ()
【自我总结|关于LSTM小白的一些心得】另外有一篇基于飞机客流预测博客,可读性强
推荐阅读