1. Project description
**Select two features, Close and Low, use the two features of the window time_steps window, and then predict the data of the Close feature data for the next day.
When batch_first=True, then LSTM inputs=(batch_size, time_steps, input_size)
batch_size = len(data)-time_steps
time_steps = sliding window, the median value of this project is lookback
input_size = 2 [because the Close and Low features are selected]**
2. Dataset
Reference: Data set in https://blog.csdn.net/qq_38633279/article/details/134245512?spm=1001.2014.3001.5501
3. Data preprocessing
3.1 Reading data
import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import MinMaxScaler import torch import torch.nn as nn import seaborn as sns import math, time from sklearn.metrics import mean_squared_error filepath = './data/rlData.csv' data = pd.read_csv(filepath) data = data.sort_values('Date') data.head() data.shape sns.set_style("darkgrid") plt.figure(figsize = (15,9)) plt.plot(data[['Close']]) plt.xticks(range(0,data.shape[0],20), data['Date'].loc[::20], rotation=45) plt.title("****** Stock Price",fontsize=18, fontweight='bold') plt.xlabel('Date',fontsize=18) plt.ylabel('Close Price (USD)',fontsize=18) plt.show()
3.2 Select the Close and Low features
price = data[['Close', 'Low']]
3.3 Data normalization
scaler = MinMaxScaler(feature_range=(-1, 1)) price['Close'] = scaler.fit_transform(price['Close'].values.reshape(-1,1)) price['Low'] = scaler.fit_transform(price['Low'].values.reshape(-1,1))
3.4 Data set production [batch_size, time_steps, input_size]
This time, 2 dimensional features are selected as output, therefore, input_size =2
x_train.shape = [batch_size,time_steps,input_size]
y_train.shape = [batch_size,1]
1. The input selects the Close and Low columns as multi-dimensional input, so the first and second columns in the data data are selected as x_train [so input_size=2]
2. The output is the selected Close column as the prediction, so the first column of the data data is selected as y_train [that is, the Close column is used as y_train].
#2. Creation of data set def split_data(stock, lookback): data_raw = stock.to_numpy() data = [] for index in range(len(data_raw) - lookback): data.append(data_raw[index: index + lookback]) data = np.array(data); test_set_size = int(np.round(0.2 * data.shape[0])) train_set_size = data.shape[0] - (test_set_size) x_train = data[:train_set_size,:-1,:] #x_train.shape = (198, 4, 2) y_train = data[:train_set_size,-1,0:1] #y_train.shape = (198, 1) x_test = data[train_set_size:,:-1,:] #x_test.shape = (49, 4, 2) y_test = data[train_set_size:,-1,0:1] #y_test.shape = (49, 1) return [torch.Tensor(x_train), torch.Tensor(y_train), torch.Tensor(x_test),torch.Tensor(y_test)] lookback = 5 x_train, y_train, x_test, y_test = split_data(price, lookback) print('x_train.shape = ',x_train.shape) print('y_train.shape = ',y_train.shape) print('x_test.shape = ',x_test.shape) print('y_test.shape = ',y_test.shape)
4.LSTM algorithm
The LSTM algorithm here is exactly the same as the LSTM prediction algorithm in single-dimensional single-step prediction. It’s just that when we create the data set, the input to the LSTM model is different.
class LSTM(nn.Module): def __init__(self, input_dim, hidden_dim, num_layers, output_dim): super(LSTM, self).__init__() self.hidden_dim = hidden_dim self.num_layers = num_layers self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True) self.fc = nn.Linear(hidden_dim, output_dim) def forward(self, x): h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_() c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_() out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach())) out = self.fc(out[:, -1, :])
5. Pre-training
input_dim = 2 hidden_dim = 32 num_layers = 2 output_dim = 1 num_epochs = 100 model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers) criterion = torch.nn.MSELoss() optimiser = torch.optim.Adam(model.parameters(), lr=0.01) hist = np.zeros(num_epochs) lstm = [] for t in range(num_epochs): y_train_pred = model(x_train) loss = criterion(y_train_pred, y_train) hist[t] = loss.item() # print("Epoch ", t, "MSE: ", loss.item()) optimizer.zero_grad() loss.backward() optimizer.step()
6. Draw fitting graphs of predicted values and true values, as well as loss graphs
predict = pd.DataFrame(scaler.inverse_transform(y_train_pred.detach().numpy())) original = pd.DataFrame(scaler.inverse_transform(y_train.detach().numpy())) sns.set_style("darkgrid") fig = plt.figure() fig.subplots_adjust(hspace=0.2, wspace=0.2) plt.subplot(1, 2, 1) ax = sns.lineplot(x = original.index, y = original[0], label="Data", color='royalblue') ax = sns.lineplot(x = predict.index, y = predict[0], label="Training Prediction (LSTM)", color='tomato') ax.set_title('Stock price', size = 14, fontweight='bold') ax.set_xlabel("Days", size = 14) ax.set_ylabel("Cost (USD)", size = 14) ax.set_xticklabels('', size=10) plt.subplot(1, 2, 2) ax = sns.lineplot(data=hist, color='royalblue') ax.set_xlabel("Epoch", size = 14) ax.set_ylabel("Loss", size = 14) ax.set_title("Training Loss", size = 14, fontweight='bold') fig.set_figheight(6) fig.set_figwidth(16) # make predictions y_test_pred = model(x_test) # invert predictions y_train_pred = scaler.inverse_transform(y_train_pred.detach().numpy()) y_train = scaler.inverse_transform(y_train.detach().numpy()) y_test_pred = scaler.inverse_transform(y_test_pred.detach().numpy()) y_test = scaler.inverse_transform(y_test.detach().numpy()) # calculate root mean squared error trainScore = math.sqrt(mean_squared_error(y_train[:,0], y_train_pred[:,0])) print('Train Score: %.2f RMSE' % (trainScore)) testScore = math.sqrt(mean_squared_error(y_test[:,0], y_test_pred[:,0])) print('Test Score: %.2f RMSE' % (testScore)) lstm.append(trainScore) lstm.append(testScore) lstm.append(training_time)
Complete code
Problem description: Select the two features Close and Low, use the two features of the window time_steps window, and then predict the data of the Close feature data for the next day. When batch_first=True, then LSTM inputs=(batch_size, time_steps, input_size) batch_size = len(data)-time_steps time_steps = sliding window, the median value of this project is lookback input_size = 2 [because the Close and Low features are selected] #%% import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import MinMaxScaler import torch import torch.nn as nn import seaborn as sns import math, time from sklearn.metrics import mean_squared_error filepath = './data/rlData.csv' data = pd.read_csv(filepath) data = data.sort_values('Date') data.head() data.shape sns.set_style("darkgrid") plt.figure(figsize = (15,9)) plt.plot(data[['Close']]) plt.xticks(range(0,data.shape[0],20), data['Date'].loc[::20], rotation=45) plt.title("****** Stock Price",fontsize=18, fontweight='bold') plt.xlabel('Date',fontsize=18) plt.ylabel('Close Price (USD)',fontsize=18) plt.show() #1. Select 2 feature projects price = data[['Close', 'Low']] scaler = MinMaxScaler(feature_range=(-1, 1)) price['Close'] = scaler.fit_transform(price['Close'].values.reshape(-1,1)) price['Low'] = scaler.fit_transform(price['Low'].values.reshape(-1,1)) #2. Production of data set def split_data(stock, lookback): data_raw = stock.to_numpy() data = [] for index in range(len(data_raw) - lookback): data.append(data_raw[index: index + lookback]) data = np.array(data); test_set_size = int(np.round(0.2 * data.shape[0])) train_set_size = data.shape[0] - (test_set_size) x_train = data[:train_set_size,:-1,:] #x_train.shape = (198, 4, 2) y_train = data[:train_set_size,-1,0:1] #y_train.shape = (198, 1) x_test = data[train_set_size:,:-1,:] #x_test.shape = (49, 4, 2) y_test = data[train_set_size:,-1,0:1] #y_test.shape = (49, 1) return [torch.Tensor(x_train), torch.Tensor(y_train), torch.Tensor(x_test),torch.Tensor(y_test)] lookback = 5 x_train, y_train, x_test, y_test = split_data(price, lookback) print('x_train.shape = ',x_train.shape) print('y_train.shape = ',y_train.shape) print('x_test.shape = ',x_test.shape) print('y_test.shape = ',y_test.shape) class LSTM(nn.Module): def __init__(self, input_dim, hidden_dim, num_layers, output_dim): super(LSTM, self).__init__() self.hidden_dim = hidden_dim self.num_layers = num_layers self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True) self.fc = nn.Linear(hidden_dim, output_dim) def forward(self, x): h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_() c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_() out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach())) out = self.fc(out[:, -1, :]) return out input_dim = 2 hidden_dim = 32 num_layers = 2 output_dim = 1 num_epochs = 100 model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers) criterion = torch.nn.MSELoss() optimiser = torch.optim.Adam(model.parameters(), lr=0.01) hist = np.zeros(num_epochs) lstm = [] for t in range(num_epochs): y_train_pred = model(x_train) loss = criterion(y_train_pred, y_train) hist[t] = loss.item() # print("Epoch ", t, "MSE: ", loss.item()) optimizer.zero_grad() loss.backward() optimizer.step() predict = pd.DataFrame(scaler.inverse_transform(y_train_pred.detach().numpy())) original = pd.DataFrame(scaler.inverse_transform(y_train.detach().numpy())) sns.set_style("darkgrid") fig = plt.figure() fig.subplots_adjust(hspace=0.2, wspace=0.2) plt.subplot(1, 2, 1) ax = sns.lineplot(x = original.index, y = original[0], label="Data", color='royalblue') ax = sns.lineplot(x = predict.index, y = predict[0], label="Training Prediction (LSTM)", color='tomato') ax.set_title('Stock price', size = 14, fontweight='bold') ax.set_xlabel("Days", size = 14) ax.set_ylabel("Cost (USD)", size = 14) ax.set_xticklabels('', size=10) plt.subplot(1, 2, 2) ax = sns.lineplot(data=hist, color='royalblue') ax.set_xlabel("Epoch", size = 14) ax.set_ylabel("Loss", size = 14) ax.set_title("Training Loss", size = 14, fontweight='bold') fig.set_figheight(6) fig.set_figwidth(16) # make predictions y_test_pred = model(x_test) # invert predictions y_train_pred = scaler.inverse_transform(y_train_pred.detach().numpy()) y_train = scaler.inverse_transform(y_train.detach().numpy()) y_test_pred = scaler.inverse_transform(y_test_pred.detach().numpy()) y_test = scaler.inverse_transform(y_test.detach().numpy()) # calculate root mean squared error trainScore = math.sqrt(mean_squared_error(y_train[:,0], y_train_pred[:,0])) print('Train Score: %.2f RMSE' % (trainScore)) testScore = math.sqrt(mean_squared_error(y_test[:,0], y_test_pred[:,0])) print('Test Score: %.2f RMSE' % (testScore)) lstm.append(trainScore) lstm.append(testScore) lstm.append(training_time)
Reference: https://gitee.com/qiangchen_sh/stock-prediction/blob/master/code/LSTM from theoretical basis to code practice 4 multi-dimensional feature stock price prediction_Pytorch.ipynb