[torch tests the parallel training of different GPU data. The sum of the gradients of each GPU’s small batches is equal to the gradient of the entire batch]

import torch
from torch import nn
from torch.nn import functional as F

#Initialize model parameters
scale=0.01
W1 = torch.randn(size=(20, 1, 3, 3)) * scale
b1 = torch.zeros(20)
W2 = torch.randn(size=(50, 20, 5, 5)) * scale
b2 = torch.zeros(50)
W3 = torch.randn(size=(800, 128)) * scale
b3 = torch.zeros(128)
W4 = torch.randn(size=(128, 10)) * scale
b4 = torch.zeros(10)
params = [W1, b1, W2, b2, W3, b3, W4, b4]

# Define model
def lenet(X, params):
    h1_conv = F.conv2d(input=X, weight=params[0], bias=params[1])
    h1_activation = F.relu(h1_conv)
    h1 = F.avg_pool2d(input=h1_activation, kernel_size=(2, 2), stride=(2, 2))
    h2_conv = F.conv2d(input=h1, weight=params[2], bias=params[3])
    h2_activation = F.relu(h2_conv)
    h2 = F.avg_pool2d(input=h2_activation, kernel_size=(2, 2), stride=(2, 2))
    h2 = h2.reshape(h2.shape[0], -1)
    h3_linear = torch.mm(h2, params[4]) + params[5]
    h3 = F.relu(h3_linear)
    y_hat = torch.mm(h3, params[6]) + params[7]
    return y_hat


# Cross entropy loss function
loss = nn.CrossEntropyLoss(reduction="none")


def get_params(params, device):
    new_params = [p.to(device) for p in params]
    for p in new_params:
        p.requires_grad_()
    return new_params


def try_gpu(i=0):
    """If it exists, return gpu(i), otherwise return cpu()"""
    if torch.cuda.device_count() >= i + 1:
        return torch.device(f"cuda:{i}")
    return torch.device("cpu")


# new_params = get_params(params, try_gpu(0))
# params_cpu = get_params(params.copy(), try_gpu(10))
# print('b1 weight:', new_params[1])
# print('b1 gradient:', new_params[1].grad)


def allreduce(data):
    for i in range(1, len(data)):
        data[0][:] + = data[i].to(data[0].device)
    for i in range(1, len(data)):
        data[i][:] = data[0].to(data[i].device)


def split_batch(X, y, devices):
    """Split X and y across multiple devices"""
    assert X.shape[0] == y.shape[0]
    return (nn.parallel.scatter(X, devices), nn.parallel.scatter(y, devices))


def sgd(params, lr, batch_size):
    """Mini-batch stochastic gradient descent"""
    # Under this module, the requirements_grad of all calculated tensors are automatically set to False.
    with torch.no_grad():
        for param in params:
            param -= lr * param.grad/batch_size
            param.grad.zero_()

import torchvision
from torchvision import transforms
from torch.utils import data
import os

script_dir = os.path.dirname(os.path.abspath(__file__))


def load_data_fashion_mnist(batch_size, resize=None):
    """Download the Fashion-MNIST dataset and load it into memory"""
    # The transforms.ToTensor() function is to format data in the original PILImage format or numpy.array format into a tensor type that can be quickly processed by pytorch.
    # https://blog.csdn.net/qq_38410428/article/details/94719553
    trans = [transforms.ToTensor()] # Instantiation
    if resize:
        trans.insert(0, transforms.Resize(resize))
    # For example, we need to scale an image first and then convert it into a Tensor operator. We can do it in steps, but this is often more cumbersome.
    #So, we can use the Compose operation. When instantiating, we pass in a list, which is several instantiated tansforms classes, and is passed into Compose as a parameter.
    # Pay special attention to the fact that the data after the first operation in compose must conform to the input type of the second operation. For example, in the above example, the input of the second operation is of PIL type, so Totensor transformation can be performed normally.
    trans = transforms.Compose(trans)
    # Get the absolute path of the current script

    # Path to build data download directory (in the upper directory of the current script file)
    data_dir = os.path.join(script_dir, "..", "..", "data")

    mnist_train = torchvision.datasets.FashionMNIST(
        root=data_dir, train=True, transform=trans, download=True
    )
    mnist_test = torchvision.datasets.FashionMNIST(
        root=data_dir, train=False, transform=trans, download=True
    )
    return (
        data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=2),
        data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=2),
    )

import matplotlib
matplotlib.use("Agg") # This sentence must be placed before the following sentence
from matplotlib import pyplot as plt

def use_svg_display():
    """Display drawings in Jupyter using svg format"""
    #You can try adding this code, %config InlineBackend.figure_format = 'svg'
    # backend_inline.set_matplotlib_formats('svg')

def set_figsize(figsize=(3.5, 2.5)):
    """Set the chart size of matplotlib"""
    use_svg_display()
    plt.rcParams['figure.figsize'] = figsize

def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):
    """Set the axis of matplotlib"""
    axes.set_xlabel(xlabel)
    axes.set_ylabel(ylabel)
    axes.set_xscale(xscale)
    axes.set_yscale(yscale)
    axes.set_xlim(xlim)
    axes.set_ylim(ylim)
    if legend:
        axes.legend(legend)
    axes.grid()

#With the above three functions for graphics configuration, define a plot function to draw multiple curves concisely, because we need to visualize many curves throughout the book.
def plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None,
         ylim=None, xscale='linear', yscale='linear',
         fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None):
    """Plot data points"""
    if legend is None:
        legend = []

    set_figsize(figsize)
    axes = axes if axes else plt.gca()

    # If X has an axis, output True
    def has_one_axis(X):
        return (hasattr(X, "ndim") and X.ndim == 1 or isinstance(X, list)
                and not hasattr(X[0], "__len__"))

    if has_one_axis(X):
        X = [X]
    if Y is None:
        X, Y = [[]] * len(X), X
    elif has_one_axis(Y):
        Y = [Y]
    if len(X) != len(Y):
        X = X * len(Y)
    axes.cla()
    for x, y, fmt in zip(X, Y, fmts):
        iflen(x):
            axes.plot(x, y, fmt)
        else:
            axes.plot(y, fmt)
    set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend)

class Animator:
    """Draw data in animation"""
    def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
                 ylim=None, xscale='linear', yscale='linear',
                 fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
                 figsize=(7, 5)):
        # Draw multiple lines incrementally
        if legend is None:
            legend = []
        use_svg_display()
        self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize)
        if nrows * ncols == 1:
            self.axes = [self.axes, ]
        # Use lambda function to capture parameters
        self.config_axes = lambda: set_axes(
            self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
        self.X, self.Y, self.fmts = None, None, fmts

    def add(self, x, y):
        # Add multiple data points to the chart
        if not hasattr(y, "__len__"):
            y = [y]
        n = len(y)
        if not hasattr(x, "__len__"):
            x = [x] * n
        if not self.X:
            self.X = [[] for _ in range(n)]
        if not self.Y:
            self.Y = [[] for _ in range(n)]
        for i, (a, b) in enumerate(zip(x, y)):
            if a is not None and b is not None:
                self.X[i].append(a)
                self.Y[i].append(b)
        self.axes[0].cla()
        for x, y, fmt in zip(self.X, self.Y, self.fmts):
            self.axes[0].plot(x, y, fmt)
        self.config_axes()
        plt.show()

class Accumulator:
    """Accumulate on n variables"""

    def __init__(self, n):
        self.data = [0.0] * n

    def add(self, *args):
        self.data = [a + float(b) for a, b in zip(self.data, args)]

    def reset(self):
        self.data = [0.0] * len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

def accuracy(y_hat, y):
    """Calculate the number of correct predictions"""
    if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
        y_hat = y_hat.argmax(axis=1) # Get the index of the largest element in each row to get the predicted category
    cmp = y_hat.type(y.dtype) == y #
    return float(cmp.type(y.dtype).sum()) # Return the number of correct predictions

import time
import numpy as np

class Timer:
    """Record multiple running times"""

    def __init__(self):
        self.times = []
        self.lastTimeSum = 0
        self.start()

    def start(self):
        """Start timer"""
        self.tik = time.time()

    def stop(self):
        """Stop the timer and record the time in the list"""
        self.times.append(time.time() - self.tik)
        return self.times[-1]

    def avg(self):
        """Return average time"""
        return sum(self.times) / len(self.times)
    
    def sum(self):
        """Return the total time"""
        self.lastTimeSum = sum(self.times)
        return self.lastTimeSum

    def cumsum(self):
        """Return cumulative time"""
        return np.array(self.times).cumsum().tolist()

def evaluate_accuracy_gpu(net, data_iter, device=None):
    """Use GPU to calculate the accuracy of the model on the data set"""
    if isinstance(net, torch.nn.Module):
        net.eval() # Set to evaluation mode, close Dropout and directly settle the mean and variance of all batches
        if not device:
            # Use parameters to build a virtual calculation graph, then obtain a parameter tensor from the calculation graph, and then obtain the device where this parameter tensor is located through the .device property. This parameter tensor is located as the first parameter of the model (usually a weight matrix).
            device = next(iter(net.parameters())).device
    # Number of correct predictions, total number of predictions
    metric = Accumulator(2)
    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(X, list):
                # What is needed for BERT fine-tuning
                X = [x.to(device) for x in X]
            else:
                X = X.to(device)
            y = y.to(device)
            metric.add(accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]


def train_batch(X, y, device_params, devices, lr):
    X_shards, y_shards = split_batch(X, y, devices)
    # Calculate the loss separately on each GPU
    ls = [
        loss(lenet(X_shard, device_W), y_shard).sum()
        for X_shard, y_shard, device_W in zip(X_shards, y_shards, device_params)
    ]
    # l_cpu = loss(lenet(X, params_cpu), y).sum()
    # l_cpu.backward()
    # print(f"Large batch of gradients", [param.grad for param in params_cpu])
    # Backpropagation is performed on each GPU separately
    for l in ls:
        print(l)
        l.backward()
    # Add all gradients for each GPU and broadcast it to all GPUs
    with torch.no_grad():
        for i in range(len(device_params[0])):
            allreduce([device_params[c][i].grad for c in range(len(devices))])
    # Update model parameters on each GPU separately
    for params in device_params:
        # Here we use full size mini-batch
        sgd(params, lr, X.shape[0])


def train(num_gpus, batch_size, lr):
    train_iter, test_iter = load_data_fashion_mnist(batch_size)
    devices = [try_gpu(i) for i in range(num_gpus)]
    #Copy model parameters to num_gpus GPUs
    device_params = [get_params(params, d) for d in devices]
    num_epochs = 10
    animator = Animator("epoch", "test acc", xlim=[1, num_epochs])
    timer = Timer()
    for epoch in range(num_epochs):
        timer.start()
        for X, y in train_iter:
            # Perform multi-GPU training for a single mini-batch
            train_batch(X, y, device_params, devices, lr)
            # Synchronization operation, used to ensure that all streams in all CUDA cores have been executed before proceeding to the next step.
            torch.cuda.synchronize()
            break
        timer.stop()
        # Evaluate the model on GPU0
        animator.add(
            epoch + 1,
            (
                evaluate_accuracy_gpu(
                    lambda x: lenet(x, device_params[0]), test_iter, devices[0]
                ),
            ),
        )
        break
    print(f"Test accuracy: {animator.Y[0][-1]:.2f}, {timer.avg():.1f} seconds/round, in {str(devices)}")


# train(num_gpus=1, batch_size=256, lr=0.2)
# plt.savefig(script_dir + f"/TrainChart1.png")

train(num_gpus=2, batch_size=256, lr=0.2)
plt.savefig(script_dir + f"/TrainChart1.png")

Uncomment params_cpu = get_params(params.copy(), try_gpu(10))

Report an error

/home/qlf/anaconda3/envs/d2l/lib/python3.9/site-packages/torch/_tensor.py:1083: UserWarning: The .grad attribute of a Tensor that is not a leaf Tensor is being accessed. Its . grad attribute won’t be populated during autograd.backward(). If you indeed want the .grad field to be populated for a non-leaf Tensor, use .retain_grad() on the non-leaf Tensor. If you access the non- leaf Tensor by mistake, make sure you access the leaf Tensor instead. See github.com/pytorch/pytorch/pull/30531 for more informations. (Triggered internally at aten/src/ATen/core/TensorBody.h:482.)
return self._grad
Traceback (most recent call last):
File “/home/qlf/d2l/chapter12/12_5_Multi-GPU training/test small batch gradients of different GPUs.py”, line 342, in
train(num_gpus=2, batch_size=256, lr=0.2)
File “/home/qlf/d2l/chapter12/12_5_Multi-GPU training/test small batch gradients of different GPUs.py”, line 321, in train
train_batch(X, y, device_params, devices, lr)
File “/home/qlf/d2l/chapter12/12_5_Multi-GPU training/test gradients of small batches of different GPUs.py”, line 302, in train_batch
allreduce([device_params[c][i].grad for c in range(len(devices))])
File “/home/qlf/d2l/chapter12/12_5_Multi-GPU training/test gradients of small batches of different GPUs.py”, line 58, in allreduce
data[0][:] + = data[i].to(data[0].device)
TypeError: ‘NoneType’ object is not subscriptable