LSTM algorithm based on Pytorch framework (2) – multi-dimensional single-step prediction

1. Project description

**Select two features, Close and Low, use the two features of the window time_steps window, and then predict the data of the Close feature data for the next day.

When batch_first=True, then LSTM inputs=(batch_size, time_steps, input_size)

batch_size = len(data)-time_steps
time_steps = sliding window, the median value of this project is lookback
input_size = 2 [because the Close and Low features are selected]**

2. Dataset

Reference: Data set in https://blog.csdn.net/qq_38633279/article/details/134245512?spm=1001.2014.3001.5501

3. Data preprocessing

3.1 Reading data

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
import seaborn as sns
import math, time
from sklearn.metrics import mean_squared_error


filepath = './data/rlData.csv'
data = pd.read_csv(filepath)
data = data.sort_values('Date')
data.head()
data.shape

sns.set_style("darkgrid")
plt.figure(figsize = (15,9))
plt.plot(data[['Close']])
plt.xticks(range(0,data.shape[0],20), data['Date'].loc[::20], rotation=45)
plt.title("****** Stock Price",fontsize=18, fontweight='bold')
plt.xlabel('Date',fontsize=18)
plt.ylabel('Close Price (USD)',fontsize=18)
plt.show()

3.2 Select the Close and Low features

price = data[['Close', 'Low']]

3.3 Data normalization

scaler = MinMaxScaler(feature_range=(-1, 1))
price['Close'] = scaler.fit_transform(price['Close'].values.reshape(-1,1))
price['Low'] = scaler.fit_transform(price['Low'].values.reshape(-1,1))

3.4 Data set production [batch_size, time_steps, input_size]

This time, 2 dimensional features are selected as output, therefore, input_size =2
x_train.shape = [batch_size,time_steps,input_size]
y_train.shape = [batch_size,1]

1. The input selects the Close and Low columns as multi-dimensional input, so the first and second columns in the data data are selected as x_train [so input_size=2]
2. The output is the selected Close column as the prediction, so the first column of the data data is selected as y_train [that is, the Close column is used as y_train].

#2. Creation of data set
def split_data(stock, lookback):
    data_raw = stock.to_numpy()
    data = []
    for index in range(len(data_raw) - lookback):
        data.append(data_raw[index: index + lookback])
    
    data = np.array(data);
    test_set_size = int(np.round(0.2 * data.shape[0]))
    train_set_size = data.shape[0] - (test_set_size)
    
    x_train = data[:train_set_size,:-1,:] #x_train.shape = (198, 4, 2)
    y_train = data[:train_set_size,-1,0:1] #y_train.shape = (198, 1)
    
    x_test = data[train_set_size:,:-1,:] #x_test.shape = (49, 4, 2)
    y_test = data[train_set_size:,-1,0:1] #y_test.shape = (49, 1)
    
    return [torch.Tensor(x_train), torch.Tensor(y_train), torch.Tensor(x_test),torch.Tensor(y_test)]
   
lookback = 5
x_train, y_train, x_test, y_test = split_data(price, lookback)
print('x_train.shape = ',x_train.shape)
print('y_train.shape = ',y_train.shape)
print('x_test.shape = ',x_test.shape)
print('y_test.shape = ',y_test.shape)

4.LSTM algorithm

The LSTM algorithm here is exactly the same as the LSTM prediction algorithm in single-dimensional single-step prediction. It’s just that when we create the data set, the input to the LSTM model is different.

class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
        out = self.fc(out[:, -1, :])

5. Pre-training

input_dim = 2
hidden_dim = 32
num_layers = 2
output_dim = 1
num_epochs = 100

model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers)
criterion = torch.nn.MSELoss()
optimiser = torch.optim.Adam(model.parameters(), lr=0.01)

hist = np.zeros(num_epochs)
lstm = []

for t in range(num_epochs):
    y_train_pred = model(x_train)

    loss = criterion(y_train_pred, y_train)
    hist[t] = loss.item()
    # print("Epoch ", t, "MSE: ", loss.item())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

6. Draw fitting graphs of predicted values and true values, as well as loss graphs

predict = pd.DataFrame(scaler.inverse_transform(y_train_pred.detach().numpy()))
original = pd.DataFrame(scaler.inverse_transform(y_train.detach().numpy()))


sns.set_style("darkgrid")

fig = plt.figure()
fig.subplots_adjust(hspace=0.2, wspace=0.2)

plt.subplot(1, 2, 1)
ax = sns.lineplot(x = original.index, y = original[0], label="Data", color='royalblue')
ax = sns.lineplot(x = predict.index, y = predict[0], label="Training Prediction (LSTM)", color='tomato')
ax.set_title('Stock price', size = 14, fontweight='bold')
ax.set_xlabel("Days", size = 14)
ax.set_ylabel("Cost (USD)", size = 14)
ax.set_xticklabels('', size=10)

plt.subplot(1, 2, 2)
ax = sns.lineplot(data=hist, color='royalblue')
ax.set_xlabel("Epoch", size = 14)
ax.set_ylabel("Loss", size = 14)
ax.set_title("Training Loss", size = 14, fontweight='bold')
fig.set_figheight(6)
fig.set_figwidth(16)


# make predictions
y_test_pred = model(x_test)

# invert predictions
y_train_pred = scaler.inverse_transform(y_train_pred.detach().numpy())
y_train = scaler.inverse_transform(y_train.detach().numpy())
y_test_pred = scaler.inverse_transform(y_test_pred.detach().numpy())
y_test = scaler.inverse_transform(y_test.detach().numpy())

# calculate root mean squared error
trainScore = math.sqrt(mean_squared_error(y_train[:,0], y_train_pred[:,0]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(y_test[:,0], y_test_pred[:,0]))
print('Test Score: %.2f RMSE' % (testScore))
lstm.append(trainScore)
lstm.append(testScore)
lstm.append(training_time)

Complete code

Problem description:
Select the two features Close and Low, use the two features of the window time_steps window, and then predict the data of the Close feature data for the next day.
When batch_first=True, then LSTM inputs=(batch_size, time_steps, input_size)
batch_size = len(data)-time_steps
time_steps = sliding window, the median value of this project is lookback
input_size = 2 [because the Close and Low features are selected]
#%%
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
import seaborn as sns
import math, time
from sklearn.metrics import mean_squared_error


filepath = './data/rlData.csv'
data = pd.read_csv(filepath)
data = data.sort_values('Date')
data.head()
data.shape

sns.set_style("darkgrid")
plt.figure(figsize = (15,9))
plt.plot(data[['Close']])
plt.xticks(range(0,data.shape[0],20), data['Date'].loc[::20], rotation=45)
plt.title("****** Stock Price",fontsize=18, fontweight='bold')
plt.xlabel('Date',fontsize=18)
plt.ylabel('Close Price (USD)',fontsize=18)
plt.show()


#1. Select 2 feature projects
price = data[['Close', 'Low']]

scaler = MinMaxScaler(feature_range=(-1, 1))
price['Close'] = scaler.fit_transform(price['Close'].values.reshape(-1,1))
price['Low'] = scaler.fit_transform(price['Low'].values.reshape(-1,1))

#2. Production of data set
def split_data(stock, lookback):
    data_raw = stock.to_numpy()
    data = []
    for index in range(len(data_raw) - lookback):
        data.append(data_raw[index: index + lookback])
    
    data = np.array(data);
    test_set_size = int(np.round(0.2 * data.shape[0]))
    train_set_size = data.shape[0] - (test_set_size)
    
    x_train = data[:train_set_size,:-1,:] #x_train.shape = (198, 4, 2)
    y_train = data[:train_set_size,-1,0:1] #y_train.shape = (198, 1)
    
    x_test = data[train_set_size:,:-1,:] #x_test.shape = (49, 4, 2)
    y_test = data[train_set_size:,-1,0:1] #y_test.shape = (49, 1)
    
    return [torch.Tensor(x_train), torch.Tensor(y_train), torch.Tensor(x_test),torch.Tensor(y_test)]
   
lookback = 5
x_train, y_train, x_test, y_test = split_data(price, lookback)
print('x_train.shape = ',x_train.shape)
print('y_train.shape = ',y_train.shape)
print('x_test.shape = ',x_test.shape)
print('y_test.shape = ',y_test.shape)



class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
        out = self.fc(out[:, -1, :])
        return out

input_dim = 2
hidden_dim = 32
num_layers = 2
output_dim = 1
num_epochs = 100

model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers)
criterion = torch.nn.MSELoss()
optimiser = torch.optim.Adam(model.parameters(), lr=0.01)

hist = np.zeros(num_epochs)
lstm = []

for t in range(num_epochs):
    y_train_pred = model(x_train)

    loss = criterion(y_train_pred, y_train)
    hist[t] = loss.item()
    # print("Epoch ", t, "MSE: ", loss.item())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
predict = pd.DataFrame(scaler.inverse_transform(y_train_pred.detach().numpy()))
original = pd.DataFrame(scaler.inverse_transform(y_train.detach().numpy()))


sns.set_style("darkgrid")

fig = plt.figure()
fig.subplots_adjust(hspace=0.2, wspace=0.2)

plt.subplot(1, 2, 1)
ax = sns.lineplot(x = original.index, y = original[0], label="Data", color='royalblue')
ax = sns.lineplot(x = predict.index, y = predict[0], label="Training Prediction (LSTM)", color='tomato')
ax.set_title('Stock price', size = 14, fontweight='bold')
ax.set_xlabel("Days", size = 14)
ax.set_ylabel("Cost (USD)", size = 14)
ax.set_xticklabels('', size=10)

plt.subplot(1, 2, 2)
ax = sns.lineplot(data=hist, color='royalblue')
ax.set_xlabel("Epoch", size = 14)
ax.set_ylabel("Loss", size = 14)
ax.set_title("Training Loss", size = 14, fontweight='bold')
fig.set_figheight(6)
fig.set_figwidth(16)


# make predictions
y_test_pred = model(x_test)

# invert predictions
y_train_pred = scaler.inverse_transform(y_train_pred.detach().numpy())
y_train = scaler.inverse_transform(y_train.detach().numpy())
y_test_pred = scaler.inverse_transform(y_test_pred.detach().numpy())
y_test = scaler.inverse_transform(y_test.detach().numpy())

# calculate root mean squared error
trainScore = math.sqrt(mean_squared_error(y_train[:,0], y_train_pred[:,0]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(y_test[:,0], y_test_pred[:,0]))
print('Test Score: %.2f RMSE' % (testScore))
lstm.append(trainScore)
lstm.append(testScore)
lstm.append(training_time)

Reference: https://gitee.com/qiangchen_sh/stock-prediction/blob/master/code/LSTM from theoretical basis to code practice 4 multi-dimensional feature stock price prediction_Pytorch.ipynb