python代码编程题库 (python预测趋势变化的算法)

import numpy as np
import pandas as pd
from keras import Input, Model
from keras.layers import LSTM, Dense
from keras.src.callbacks import ReduceLROnPlateau
from sklearn.preprocessing import MinMaxScaler
from keras.callbacks import EarlyStopping
import matplotlib
import os

from tensorflow.keras.layers import LSTM, Dense, Reshape, Flatten, Conv1D, Dropout

matplotlib.use('TkAgg')
from matplotlib import pyplot as plt, rcParams

from data_import.tushare import TushareProBarQueryParams, tushare_pro_bar_query
# 股票模型训练 flag=1需要保存模型,=0不保存仅测试用
# ResNet(残差网络)是一种卷积神经网络(CNN)架构,通常用于图像识别和分类任务。
def resnet_model_train(flag):
    # 1、加载和预处理数据:下面可以改成通过你自己的方式获得股票数据
    params = TushareProBarQueryParams(ts_code="002192.SZ",
                                      start_date="20210101",
                                      end_date="20240409",
                                      adj='qfq',
                                      freq="D")

    df = tushare_pro_bar_query(params)
    df['date'] = pd.to_datetime(df['trade_date'])
    df['volume'] = df['vol']
    # 将日期列设置为索引
    df.set_index('date', inplace=True)

    # 按照时间顺序升序排列
    df.sort_values('date', inplace=True)

    # 1、准备训练集
    # 创建一个只有收盘价的新数据帧
    # data = df.filter(['close', 'volume'])
    data = df.filter(['close', 'open', 'high', 'low'])
    # 将数据帧转换为numpy数组
    dataset = data.values
    # 获取要对模型进行训练的行数
    training_data_len = int(np.ceil(len(dataset) * .8))
    # 数据标准化,在构建LSTM模型进行时间序列预测时,对数据进行归一化或标准化处理是一个常见的步骤,因为它可以防止某些特征的值过大或过小而影响模型的训练和性能。
    # fit_transform方法会先计算dataset中的数据的最大值和最小值,然后将这些数据缩放到指定的范围内,通常是将数据缩放到[0, 1]或[-1, 1]
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_data = scaler.fit_transform(dataset)

    # 创建训练集,训练标准化训练集,取0到training_data_len的行数据,取所有列
    train_data = scaled_data[0:int(training_data_len), :]
    # 将数据拆分为x_train和y_train数据集
    # x_train是训练数据,一个包含look_back个时间序列样本的列表,每个样本包含look_back个历史数据
    # y_train是训练标签,一个包含look_back个目标值的列表,它包含了与x_train中的每个样本对应的期望输出
    # 每个样本包含列的数量=look_back,行数(样本数量)=training_data_len - look_back
    x_train = []
    y_train = []
    # 索引从0开始,以下是准备训练集,X取前60个数据,Y取第61个数据(这个值是指下一个预测值)
    look_back = 60  # 选择60天的历史数据作为训练集的一行
    for i in range(look_back, len(train_data)):
        x_train.append(train_data[i - look_back:i, :])  # 从train_data中提取从当前索引i减去60到i的行数据(不包括 i)的所有列
        # y_train.append(train_data[i, 0])  # 它的作用是将每个时间序列样本对应的目标值(即下一个的股价,第0列,单条数据)添加到 y_train 列表中
        y_train.append(scaled_data[i, :])  # 这里我们使用了所有特征,包括收盘价和成交量

    # 将x_train和y_train转换为numpy数组==将列表 x_train 中的每个元素(它们都是NumPy数组)转换为一个单一的NumPy数组,也就是外面再包一层
    x_train, y_train = np.array(x_train), np.array(y_train)
    # reshape,将数据格式调整为LSTM网络期望的输入格式
    # x_train是一个形状为(训练样本数量, look_back, 1)的numpy数组,其中每个样本都是一个形状为(look_back, 1)的numpy数组
    # 将x_train数组的形状改变为符合了LSTM网络的输入要求,即每个样本是一个三维数组,其中第一维是样本索引=行数shape[0],第二维是时间步=列数shape[1],第三维是特征值=shape[2]。
    x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], x_train.shape[2]))

    # 2、建立ResNet模型
    def resnet_model():
        # 定义输入层的形状,注意这里我们使用整数序列而不是元组
        inputs = Input(shape=(look_back, data.shape[1]))
        conv1d = Conv1D(filters=64, kernel_size=3, activation='relu')
        conv1d_output = conv1d(inputs)  # 应用一维卷积层
        # ...(其他卷积层和网络层)...
        flattened = Flatten()(conv1d_output)  # 展平层
        dense = Dense(256, activation='relu')(flattened)
        dropout = Dropout(0.5)(dense)  # Dropout 层
        outputs = Dense(4)(dropout)  # 输出层,预测四个特征
        model = Model(inputs=inputs, outputs=outputs)
        model.compile(optimizer='adam', loss='mean_squared_error')
        return model

    # 编译模型
    model = resnet_model()

    # 设置早停来防止过拟合
    # 设置早停法的参数,如果验证集的损失在连续 10 个周期内没有改善,训练将会停止,并且模型会恢复到验证集损失最小的状态。
    early_stopping = EarlyStopping(
        monitor='val_loss',  # 监控验证集的损失
        patience=10,  # 如果验证集的损失在 10 个周期内没有改善,则停止训练
        restore_best_weights=True  # 当训练停止时,恢复验证集损失最小的模型权重
    )

    # 设置学习率调度器的参数
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',  # 监控验证集的损失
        factor=0.1,  # 当指标停止改进时,学习率将减小 10 倍
        patience=5,  # 如果验证集的损失在 5 个周期内没有改善,则减小学习率
        min_lr=0.00001  # 学习率的最小值
    )

    # 将两个回调函数放入一个列表中,同时使用这两个回调函数可以帮助您在训练过程中自动地调整学习率和避免过拟合,从而使模型达到更好的性能
    callbacks = [early_stopping, reduce_lr]

    # 训练模型
    model.fit(x_train, y_train, epochs=100, batch_size=16, verbose=1, callbacks=callbacks)

    # 3、保存模型
    if flag == 1:
        # 创建目录
        if not os.path.exists('train_models'):
            os.makedirs('train_models')
        model.save('train_models/resnet_stock_price_prediction_model.h5')

    # 3、创建测试数据集
    # 创建一个新的数组,包含从索引的缩放值,test_data包含了训练集的最后一行的样本数据,为了和训练集数据有交集
    test_data = scaled_data[training_data_len - look_back:, :]
    # 创建数据集x_test和y_test
    x_test = []
    y_test = dataset[training_data_len:, :]  # 后面的实际数据,用于和预测数据的比较
    for i in range(look_back, len(test_data)):
        x_test.append(test_data[i - look_back:i, :])
    # 将数据转换为numpy数组
    x_test = np.array(x_test)
    # 重塑的数据,包含了训练集的最后一行的样本数据和测试集的样本数据
    x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], x_test.shape[2]))
    # 得到模型的预测值
    predictions = model.predict(x_test)
    # 为了将预测结果转换回原始的数据空间,你需要使用之前用于归一化或标准化的逆变换

    # 用于在水平方向(即列方向)上堆叠多个数组。具体来说,它会把多个数组合并成一个新数组
    # predictions = np.hstack((predictions, predictions * 0))

    predictions = scaler.inverse_transform(predictions)
    # 得到均方根误差(RMSE),如果数据范围是 0.1 元到 1 元,那么 RMSE 为 0.01 元可能是一个较大的值
    rmse = np.sqrt(np.mean(((predictions - y_test) ** 2)))

    train = data[:training_data_len]  # 训练数据
    valid = data[training_data_len:]  # 真实数据
    # valid['Predictions'] = predictions  # 预测数据
    valid['PClose'] = predictions[:, 0]  # 第一个预测值(收盘价)
    valid['POpen'] = predictions[:, 1]  # 第二个预测值
    valid['PHigh'] = predictions[:, 2]  # 第二个预测值
    valid['PLow'] = predictions[:, 3]  # 第二个预测值

    plt.figure(figsize=(16, 6))
    plt.title('model')
    plt.xlabel('date', fontsize=18)
    plt.ylabel('close RMB (¥)', fontsize=18)
    # plt.plot(train[['close', 'open', 'high', 'low']])
    # plt.plot(valid[['close', 'open', 'high', 'low', 'PClose', 'POpen', 'PHigh', 'PLow']])
    plt.plot(train[['close']])
    plt.plot(valid[['close', 'PClose']])
    plt.legend(['xunlian', 'shiji', 'yuce'], loc='lower right')
    plt.show()

    return rmse

python代码实现性能测试,python大数据预测模型

python代码实现性能测试,python大数据预测模型

python代码实现性能测试,python大数据预测模型

python代码实现性能测试,python大数据预测模型