import numpy as np
import pandas as pd
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.models import load_model
from keras.src.callbacks import ReduceLROnPlateau
from sklearn.preprocessing import MinMaxScaler
from keras.callbacks import EarlyStopping
import matplotlib
import os
matplotlib.use('TkAgg')
from matplotlib import pyplot as plt, rcParams
# 股票模型训练 flag=1需要保存模型,=0不保存仅测试用
def lstm_model_train(flag):
# 1、加载和预处理数据:这里可以改成你自己获取股票数据的方法
params = TushareProBarQueryParams(ts_code="002192.SZ",
start_date="20210101",
end_date="20240409",
adj='qfq',
freq="D")
df = tushare_pro_bar_query(params)
df['date'] = pd.to_datetime(df['trade_date'])
df['volume'] = df['vol']
# 将日期列设置为索引
df.set_index('date', inplace=True)
# 按照时间顺序升序排列
df.sort_values('date', inplace=True)
# 1、准备训练集
# 创建一个只有收盘价的新数据帧
# data = df.filter(['close', 'volume'])
data = df.filter(['close', 'open', 'high', 'low'])
# 将数据帧转换为numpy数组
dataset = data.values
# 获取要对模型进行训练的行数
training_data_len = int(np.ceil(len(dataset) * .8))
# 数据标准化,在构建LSTM模型进行时间序列预测时,对数据进行归一化或标准化处理是一个常见的步骤,因为它可以防止某些特征的值过大或过小而影响模型的训练和性能。
# fit_transform方法会先计算dataset中的数据的最大值和最小值,然后将这些数据缩放到指定的范围内,通常是将数据缩放到[0, 1]或[-1, 1]
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(dataset)
# 创建训练集,训练标准化训练集,取0到training_data_len的行数据,取所有列
train_data = scaled_data[0:int(training_data_len), :]
# 将数据拆分为x_train和y_train数据集
# x_train是训练数据,一个包含look_back个时间序列样本的列表,每个样本包含look_back个历史数据
# y_train是训练标签,一个包含look_back个目标值的列表,它包含了与x_train中的每个样本对应的期望输出
# 每个样本包含列的数量=look_back,行数(样本数量)=training_data_len - look_back
x_train = []
y_train = []
# 索引从0开始,以下是准备训练集,X取前60个数据,Y取第61个数据(这个值是指下一个预测值)
look_back = 60 # 选择60天的历史数据作为训练集的一行
for i in range(look_back, len(train_data)):
x_train.append(train_data[i - look_back:i, :]) # 从train_data中提取从当前索引i减去60到i的行数据(不包括 i)的所有列
# y_train.append(train_data[i, 0]) # 它的作用是将每个时间序列样本对应的目标值(即下一个的股价,第0列,单条数据)添加到 y_train 列表中
y_train.append(scaled_data[i, :]) # 这里我们使用了所有特征,包括收盘价和成交量
# 将x_train和y_train转换为numpy数组==将列表 x_train 中的每个元素(它们都是NumPy数组)转换为一个单一的NumPy数组,也就是外面再包一层
x_train, y_train = np.array(x_train), np.array(y_train)
# reshape,将数据格式调整为LSTM网络期望的输入格式
# x_train是一个形状为(训练样本数量, look_back, 1)的numpy数组,其中每个样本都是一个形状为(look_back, 1)的numpy数组
# 将x_train数组的形状改变为符合了LSTM网络的输入要求,即每个样本是一个三维数组,其中第一维是样本索引=行数shape[0],第二维是时间步=列数shape[1],第三维是特征值=shape[2]。
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], x_train.shape[2]))
# 2、建立LSTM模型
model = Sequential()
# input_shape=(x_train.shape[1], 1)这告诉LSTM层期望的输入数据形状是(时间步长, 特征数量)
# 50 是传递给LSTM层的第一个参数,它指定了该层中的LSTM单元的数量,也就是该层的输出维度。
# return_sequences=True它指示LSTM层不仅返回最后一个时间步的输出,而是返回每个时间步的输出。这对于创建堆叠的LSTM层很有用,其中下一个LSTM层将使用前一个LSTM层的所有时间步的输出作为输入。
model.add(LSTM(64, return_sequences=True, input_shape=(x_train.shape[1], x_train.shape[2])))
model.add(LSTM(64, return_sequences=False))
# 添加全连接层(密集层)模型输出层,生成最终的预测。例子中,它预测一个单一值,如下一时间步的股价。
# model.add(Dense(1))
model.add(Dense(units=4)) # 这里我们添加了一个全连接层,用于预测4个特征点
# 编译模型
model.compile(optimizer='adam', loss='mean_squared_error')
# 设置早停来防止过拟合
# 设置早停法的参数,如果验证集的损失在连续 10 个周期内没有改善,训练将会停止,并且模型会恢复到验证集损失最小的状态。
early_stopping = EarlyStopping(
monitor='val_loss', # 监控验证集的损失
patience=10, # 如果验证集的损失在 10 个周期内没有改善,则停止训练
restore_best_weights=True # 当训练停止时,恢复验证集损失最小的模型权重
)
# 设置学习率调度器的参数
reduce_lr = ReduceLROnPlateau(
monitor='val_loss', # 监控验证集的损失
factor=0.1, # 当指标停止改进时,学习率将减小 10 倍
patience=5, # 如果验证集的损失在 5 个周期内没有改善,则减小学习率
min_lr=0.00001 # 学习率的最小值
)
# 将两个回调函数放入一个列表中,同时使用这两个回调函数可以帮助您在训练过程中自动地调整学习率和避免过拟合,从而使模型达到更好的性能
callbacks = [early_stopping, reduce_lr]
# 训练模型
# epochs=50:这个参数指定了训练模型的迭代次数。
# batch_size=32:这个参数指定了在每次迭代中使用的样本数量。
# verbose=1:这个参数指定了日志显示模式,0表示不显示,1表示显示进度条,2表示显示每个epoch的详细信息
# callbacks=[early_stopping]:这个参数指定了在训练过程中要使用的回调函数列表。
model.fit(x_train, y_train, epochs=180, batch_size=8, verbose=1, callbacks=callbacks)
# model.fit(x_train, y_train, epochs=180, batch_size=8, verbose=1)
# 3、保存模型
if flag == 1:
# 创建目录
if not os.path.exists('train_models'):
os.makedirs('train_models')
model.save('train_models/lstm_stock_price_prediction_model.h5')
# 3、创建测试数据集
# 创建一个新的数组,包含从索引的缩放值,test_data包含了训练集的最后一行的样本数据,为了和训练集数据有交集
test_data = scaled_data[training_data_len - look_back:, :]
# 创建数据集x_test和y_test
x_test = []
y_test = dataset[training_data_len:, :] # 后面的实际数据,用于和预测数据的比较
for i in range(look_back, len(test_data)):
x_test.append(test_data[i - look_back:i, :])
# 将数据转换为numpy数组
x_test = np.array(x_test)
# 重塑的数据,包含了训练集的最后一行的样本数据和测试集的样本数据
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], x_test.shape[2]))
# 得到模型的预测值
predictions = model.predict(x_test)
# 为了将预测结果转换回原始的数据空间,你需要使用之前用于归一化或标准化的逆变换
# 用于在水平方向(即列方向)上堆叠多个数组。具体来说,它会把多个数组合并成一个新数组
# predictions = np.hstack((predictions, predictions * 0))
predictions = scaler.inverse_transform(predictions)
# 得到均方根误差(RMSE),如果数据范围是 0.1 元到 1 元,那么 RMSE 为 0.01 元可能是一个较大的值
rmse = np.sqrt(np.mean(((predictions - y_test) ** 2)))
train = data[:training_data_len] # 训练数据
valid = data[training_data_len:] # 真实数据
# valid['Predictions'] = predictions # 预测数据
valid['PClose'] = predictions[:, 0] # 第一个预测值(收盘价)
valid['POpen'] = predictions[:, 1] # 第二个预测值
valid['PHigh'] = predictions[:, 2] # 第二个预测值
valid['PLow'] = predictions[:, 3] # 第二个预测值
#分别画出实际和预测的数据进行对比
plt.figure(figsize=(16, 6))
plt.title('model')
plt.xlabel('date', fontsize=18)
plt.ylabel('close RMB (¥)', fontsize=18)
# plt.plot(train[['close', 'open', 'high', 'low']])
# plt.plot(valid[['close', 'open', 'high', 'low', 'PClose', 'POpen', 'PHigh', 'PLow']])
plt.plot(train[['close']])
plt.plot(valid[['close', 'PClose']])
plt.legend(['xunlian', 'shiji', 'yuce'], loc='lower right')
plt.show()
return rmse

