Experiment 9 Feedforward Neural Network (5)

Experimental task: Complete iris classification based on feedforward neural network

1 Mini-batch gradient descent method

Mini-batch gradient descent means toreduce thecomputational complexity of each iteration, at each iteration Only collect a small number of samples, calculate the gradient of the loss function on this set of samples and update the parameters. In the above figure, the number K of each group of samples is the batch size. Calculate the gradient of each sample loss function on the Bt subset and average it strong>, and then update the parameters. The formula is as shown below:

2 Data processing

import numpy as np
import torch
import matplotlib.pyplot as plt
from nndl.dataset import load_data

data_x, data_y = load_data()
iris_first = []
iris_second = []
iris_third = []
for i in range(0, len(data_y)):
    if (data_y[i] == 0):
        iris_first.append(data_x[i, :].numpy())
    elif (data_y[i] == 2):
        iris_second.append(data_x[i, :].numpy())
    else:
        iris_third.append(data_x[i, :].numpy())
iris_first = torch.tensor(iris_first)
iris_second = torch.tensor(iris_second)
iris_third = torch.tensor(iris_third)

plt.scatter(iris_first[:, 0], iris_first[:, 1], c='b')
plt.scatter(iris_second[:, 0], iris_second[:, 1], c='y')
plt.scatter(iris_third[:, 0], iris_third[:, 1], c='g')
plt.legend(['iris versicolor', 'iris setosa', 'iris vlrglnica'])
plt.show()

Since the optimizer is a small-batch stochastic gradient descent method, the data needs to berandomly grouped, so a data iterator must be constructed (obtained from all data sets during each iteration A batch of specified amount of data), the principle is shown in the figure below:

  1. Encapsulate the data set into the Dataset class, pass in a set of index values, and obtain data from the data set based on the index
  2. To build the DataLoader class, you need to specify the size of the data batch (K) and whether the data needs to be shuffled. This class can be used to batch retrieve data.

In pytorch, use the torch.utils.data.Dataset class to load minibatch data. The torch.utils.data.DataLoader API can generate an iterator, in which the length of the minibatch is specified by setting the batch_size parameter, and by setting the shuffle parameter to True. You can shuffle the index order when generating the minibatch index list.

“__getitem__” in the data reading class (IrisDataset) implements obtaining the specified sample in the data set based on the given index and performing data processing on the sample. “__len__” returns the number of samples in the data set.

Use DataLoader for encapsulation:

import torch
import numpy as np
import torch.utils.data
from nndl.dataset import load_data

class IrisDataset(torch.utils.data.Dataset):
    def __init__(self, mode='train', num_train=120, num_dev=15):
        super(IrisDataset, self).__init__()
        # Call the data reading function in Chapter 3, which does not need to convert the label to one-hot type
        X, y = load_data(shuffle=True)
        if mode == 'train':
            self.X, self.y = X[:num_train], y[:num_train]
        elif mode == 'dev':
            self.X, self.y = X[num_train:num_train + num_dev], y[num_train:num_train + num_dev]
        else:
            self.X, self.y = X[num_train + num_dev:], y[num_train + num_dev:]

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

    def __len__(self):
        returnlen(self.y)

torch.manual_seed(12)
train_dataset = IrisDataset(mode='train')
dev_dataset = IrisDataset(mode='dev')
test_dataset = IrisDataset(mode='test')
#Print training set length
print("length of train set: ", len(train_dataset))

# Batch size
batch_size = 16

# Download Data
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = torch.utils.data.DataLoader(dev_dataset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

3 Model Construction

In the code definition of the feedforward neural network, self.fc1 = nn.Linear(input_size, hidden_size) defines the first fully connected layer, which maps the input features to the hidden layer, normal_(self.fc1.weight, mean=0.0, std =0.01) Use normal distribution to initialize the weights of the first fully connected layer and set the bias to 1.0.

In the forward propagation method (forward), the first fully connected layer is first called, then the Sigmoid activation function is applied, and then the second fully connected layer is passed, and finally the output result is returned.

# Model construction
import torch.nn as nn
from torch.nn.init import constant_, normal_, uniform_

# Define feedforward neural network
class Model_MLP_L2_V3(nn.Module):
    def __init__(self, input_size, output_size, hidden_size):
        super(Model_MLP_L2_V3, self).__init__()
        # Build the first fully connected layer
        self.fc1 = nn.Linear(input_size, hidden_size)
        normal_(self.fc1.weight, mean=0.0, std=0.01)
        constant_(self.fc1.bias, val=1.0)
        # Build the second fully connected layer
        self.fc2 = nn.Linear(hidden_size, output_size)
        normal_(self.fc2.weight, mean=0.0, std=0.01)
        constant_(self.fc2.bias, val=1.0)
        #Define the activation function used by the network
        self.act = nn.Sigmoid()

    def forward(self, inputs):
        outputs = self.fc1(inputs)
        outputs = self.act(outputs)
        outputs = self.fc2(outputs)
        return outputs

fnn_model = Model_MLP_L2_V3(input_size=4, output_size=3, hidden_size=6)

4 Improve the Runner class

The RunnerV3 class is improved and implemented based on the RunnerV2 class. The training process uses automatic gradient calculation, uses DataLoader to load batch data, and uses the stochastic gradient descent method for parameter optimization; when saving the model, use the state_dict method to obtain model parameters; when loading the model, use set_state_dict Method loads model parameters.

Since the stochastic gradient descent method is used to optimize parameters, the data is input into the model in the form of batches for training. Then the evaluation index calculation is also performed in each batch separately. To obtain the overall evaluation results of each epoch, you need to Accumulate historical evaluation results (define the Accuracy class to implement this function).

Accuracy: The Update method implements the accumulation function of historical evaluation results. batch_correct and batch_count respectively record the number of correctly predicted samples and the total number of predicted samples for each batch, self.num_correct + = batch_correct; self.num_count + = batch_count implements updates and records the number of correctly predicted samples and the total number of samples in the training set. In the Accumulate method, the num_correct and num_count obtained by the above method are used to calculate the total indicator.

# Improve the Runner class
import torch

class Accuracy():
    def __init__(self, is_logist=True):
        """
        enter:
           - is_logist: whether the outputs are logist or activated values
        """

        # Used to count the correct number of samples
        self.num_correct = 0
        # The total number of samples used for statistics
        self.num_count = 0

        self.is_logist = is_logist

    def update(self, outputs, labels):
        """
        enter:
           - outputs: predicted value, shape=[N,class_num]
           - labels: label value, shape=[N,1]
        """

        # Determine whether it is a binary classification task or a multi-classification task. When shape[1]=1, it is a two-classification task. When shape[1]>1, it is a multi-classification task.
        if outputs.shape[1] == 1: # Two categories
            outputs = torch.squeeze(outputs, dim=-1)
            if self.is_logist:
                #logist determines whether it is greater than 0
                preds = torch.tensor((outputs >= 0), dtype=torch.float32)
            else:
                # If it is not a logist, determine whether each probability value is greater than 0.5. When it is greater than 0.5, the category is 1, otherwise the category is 0
                preds = torch.tensor((outputs >= 0.5), dtype=torch.float32)
        else:
            # When there are multiple categories, use 'paddle.argmax' to calculate the maximum element index as the category
            preds = torch.argmax(outputs, dim=1)
            preds = torch.tensor(preds, dtype=torch.int64)

        # Get the number of correctly predicted samples in this batch of data
        labels = torch.squeeze(labels, dim=-1)
        batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).numpy()
        batch_count = len(labels)

        # Update num_correct and num_count
        self.num_correct + = batch_correct
        self.num_count + = batch_count

    def accumulate(self):
        # Use accumulated data to calculate the overall indicator
        if self.num_count == 0:
            return 0
        return self.num_correct / self.num_count

    def reset(self):
        #Reset the correct number and total
        self.num_correct = 0
        self.num_count = 0

    def name(self):
        return "Accuracy"

RunnerV3 class implementation:

# RunnerV3 class implementation
class RunnerV3(object):
    def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.metric = metric # Only used to calculate evaluation indicators

        # Record the changes in evaluation indicators during the training process
        self.dev_scores = []

        # Record the changes in the loss function during the training process
        self.train_epoch_losses = [] # Record one loss per epoch
        self.train_step_losses = [] # Record one loss per step
        self.dev_losses = []

        # Record the global optimal index
        self.best_score = 0

    def train(self, train_loader, dev_loader=None, **kwargs):
        # Switch the model to training mode
        self.model.train()

        # Pass in the number of training rounds, if no value is passed in, the default is 0
        num_epochs = kwargs.get("num_epochs", 0)
        # Pass in the log printing frequency. If no value is passed in, the default is 100.
        log_steps = kwargs.get("log_steps", 100)
        # Review frequency
        eval_steps = kwargs.get("eval_steps", 0)

        # Pass in the model saving path. If no value is passed in, the default is "best_model.pdparams"
        save_path = kwargs.get("save_path", "best_model.pdparams")

        custom_print_log = kwargs.get("custom_print_log", None)

        #Total number of training steps
        num_training_steps = num_epochs * len(train_loader)

        if eval_steps:
            if self.metric is None:
                raise RuntimeError('Error: Metric can not be None!')
            if dev_loader is None:
                raise RuntimeError('Error: dev_loader can not be None!')

        # Number of steps to run
        global_step = 0

        # Perform num_epochs rounds of training
        for epoch in range(num_epochs):
            # Used to count the loss of the training set
            total_loss = 0
            for step, data in enumerate(train_loader):
                X, y = data
                # Get model predictions
                logits = self.model(X)
                y = torch.tensor(y, dtype=torch.int64)
                loss = self.loss_fn(logits, y) # Find mean by default
                total_loss + = loss

                # During the training process, the loss of each step is saved.
                self.train_step_losses.append((global_step, loss.item()))

                if log_steps and global_step % log_steps == 0:
                    print(
                        f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")

                # Gradient back propagation, calculate the gradient value of each parameter
                loss.backward()

                if custom_print_log:
                    custom_print_log(self)

                # Mini-batch gradient descent for parameter update
                self.optimizer.step()
                # Zero gradient
                self.optimizer.zero_grad()

                # Determine whether evaluation is needed
                if eval_steps > 0 and global_step > 0 and \
                        (global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):

                    dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
                    print(f"[Evaluate] dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")

                    # Switch the model to training mode
                    self.model.train()

                    # If the current indicator is the optimal indicator, save the model
                    if dev_score > self.best_score:
                        self.save_model(save_path)
                        print(
                            f"[Evaluate] best accuracy performance has been updated: {self.best_score:.5f} --> {dev_score:.5f}")
                        self.best_score = dev_score

                global_step + = 1

            # Current epoch training loss cumulative value
            trn_loss = (total_loss / len(train_loader)).item()
            # Save training loss at epoch granularity
            self.train_epoch_losses.append(trn_loss)

        print("[Train] Training done!")

    # In the model evaluation phase, use 'paddle.no_grad()' to control not calculating and storing gradients
    @torch.no_grad()
    def evaluate(self, dev_loader, **kwargs):
        assert self.metric is not None

        # Set the model to evaluation mode
        self.model.eval()

        global_step = kwargs.get("global_step", -1)

        # Used to count the loss of the training set
        total_loss = 0

        # Reset rating
        self.metric.reset()

        # Traverse each batch of the validation set
        for batch_id, data in enumerate(dev_loader):
            X, y = data

            # Calculate model output
            logits = self.model(X)
            y = torch.tensor(y, dtype=torch.int64)

            # Calculate the loss function
            loss = self.loss_fn(logits, y).item()
            # Cumulative loss
            total_loss + = loss

            # Cumulative evaluation
            self.metric.update(logits, y)

        dev_loss = (total_loss / len(dev_loader))
        dev_score = self.metric.accumulate()

        # Record the verification set loss
        if global_step != -1:
            self.dev_losses.append((global_step, dev_loss))
            self.dev_scores.append(dev_score)

        return dev_score, dev_loss

    # In the model evaluation phase, use 'paddle.no_grad()' to control not calculating and storing gradients
    @torch.no_grad()
    def predict(self, x, **kwargs):
        # Set the model to evaluation mode
        self.model.eval()
        # Run the forward calculation of the model to get the predicted value
        logits = self.model(x)
        return logits

    def save_model(self, save_path):
        torch.save(self.model.state_dict(), save_path)

    def load_model(self, model_path):
        model_state_dict = torch.load(model_path)
        self.model.set_state_dict(model_state_dict)

5 model training

Instantiate RunnerV3 and pass in the training configuration to define the network, optimizer, loss function, softmax + cross entropy and evaluation indicators. Train 150 eopch, and save the model with the highest accuracy as the best model in the experiment. The experimental results are as follows:

Visually observe the training set loss and training set loss changes:

It can be seen from the output results that the accuracy gradually increases as the number of iterations increases, and the loss function decreases.

# Instantiation
import torch.optim as opt
import torch.nn.functional as F

lr = 0.2
# define network
model = fnn_model
#Define optimizer
optimizer = opt.SGD(lr=lr, params=model.parameters())
# Define loss function. softmax + cross entropy
loss_fn = F.cross_entropy
# Define evaluation indicators
metric = Accuracy(is_logist=True)
runner = RunnerV3(model, optimizer, loss_fn, metric)

# Start training
log_steps = 100
eval_steps = 50
runner.train(train_loader, dev_loader, num_epochs=150, log_steps=log_steps, eval_steps=eval_steps, save_path="best_model.pdparams")

# Visualization
import matplotlib.pyplot as plt

# Draw the loss changes of the training set and validation set and the accuracy change curve on the validation set
def plot_training_loss_acc(runner, fig_name,
                           fig_size=(16, 6),
                           sample_step=20,
                           loss_legend_loc="upper right",
                           acc_legend_loc="lower right",
                           train_color="#e4007f",
                           dev_color='#f19ec2',
                           fontsize='large',
                           train_linestyle="-",
                           dev_linestyle='--'):
    plt.figure(figsize=fig_size)

    plt.subplot(1, 2, 1)
    train_items = runner.train_step_losses[::sample_step]
    train_steps = [x[0] for x in train_items]
    train_losses = [x[1] for x in train_items]

    plt.plot(train_steps, train_losses, color=train_color, linestyle=train_linestyle, label="Train loss")
    if len(runner.dev_losses) > 0:
        dev_steps = [x[0] for x in runner.dev_losses]
        dev_losses = [x[1] for x in runner.dev_losses]
        plt.plot(dev_steps, dev_losses, color=dev_color, linestyle=dev_linestyle, label="Dev loss")
    # Draw axes and legend
    plt.ylabel("loss", fontsize=fontsize)
    plt.xlabel("step", fontsize=fontsize)
    plt.legend(loc=loss_legend_loc, fontsize='x-large')

    # Draw the evaluation accuracy change curve
    if len(runner.dev_scores) > 0:
        plt.subplot(1, 2, 2)
        plt.plot(dev_steps, runner.dev_scores,
                 color=dev_color, linestyle=dev_linestyle, label="Dev accuracy")

        # Draw axes and legend
        plt.ylabel("score", fontsize=fontsize)
        plt.xlabel("step", fontsize=fontsize)
        plt.legend(loc=acc_legend_loc, fontsize='x-large')

    plt.savefig(fig_name)
    plt.show()

plot_training_loss_acc(runner, 'fw-loss.pdf')

6 Model Evaluation

Use the test set to evaluate the best model saved during the above training process, and observe the accuracy and loss on the test set.

7 Model Prediction

Use the saved model to perform model prediction on a certain data in the test set and observe the model effect:

# Model prediction
# Model evaluation
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
test_loader = iter(test_loader)
# Get the first data in the test set
(X, label) = next(test_loader)
logits = runner.predict(X)
pred_class = torch.argmax(logits[0]).numpy()
label = label.numpy()[0]

# Output the true category and predicted category
print("The true category is {} and the predicted category is {}".format(label, pred_class))

8 Compare with Experiment 4 “Complete Iris Classification Based on Softmax Regression” and talk about your own views

Comparing the two experimental results, it can be seen that for the iris classification problem, the accuracy of the feedforward neural network is higher than that of the Softmax classification.

Feedforward neural networks have stronger generalization capabilities and can improve classification accuracy by learning complex features and patterns, while softmax regression has weaker generalization capabilities, but the training speed is usually faster than feedforward neural networks.

import numpy as np
import matplotlib.pyplot as plt
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from matplotlib.colors import ListedColormap

iris = datasets.load_iris() # Load data
list(iris.keys()) #properties
X = iris["data"][:, (2, 3)] # petal length, petal width
y = iris["target"]
# Set the hyperparameter multi_class to "multinomial", specify a solver that supports Softmax regression, use l2 regularization by default, and can be controlled by the hyperparameter C
softmax_reg = LogisticRegression(multi_class="multinomial", solver="lbfgs", C=500, random_state=42)
softmax_reg.fit(X, y)
softmax_reg.predict([[5, 2]]) # Output: array([2])
softmax_reg.predict_proba([[5, 2]])
x0, x1 = np.meshgrid(np.linspace(0, 8, 500).reshape(-1, 1), np.linspace(0, 3.5, 200).reshape(-1, 1))
X_new = np.c_[x0.ravel(), x1.ravel()]
y_proba = softmax_reg.predict_proba(X_new)
y_predict = softmax_reg.predict(X_new)
zz1 = y_proba[:, 1].reshape(x0.shape)
zz = y_predict.reshape(x0.shape)
plt.figure(figsize=(10, 4))
plt.plot(X[y == 2, 0], X[y == 2, 1], "g^", label="Iris virginica")
plt.plot(X[y == 1, 0], X[y == 1, 1], "bs", label="Iris versicolor")
plt.plot(X[y == 0, 0], X[y == 0, 1], "yo", label="Iris setosa")
custom_cmap = ListedColormap(['#fafab0', '#9898ff', '#a0faa0'])
plt.contourf(x0, x1, zz, cmap=custom_cmap)
plt.xlabel("Petal length", fontsize=14)
plt.ylabel("Petal width", fontsize=14)
plt.legend(loc="center left", fontsize=14)
plt.axis([0, 7, 0, 3.5])
plt.show()