Image classification problem based on caltech101

Task description

Based on the image classification of the Caltech101 data set, Caltech101 contains 101 categories of objects, each category has about 40 to 800 images, and 16 categories are selected. It is necessary to use an algorithm to identify which category the image belongs to based on the image characteristics.

Data description

The image data set used in the task contains 1567 pictures, divided into 16 categories, with more than 80 pictures in each category. The 16 categories are: ak47, binoculars, boom-box, calculator, cannon, computer-keyboard, computer-monitor, computer-mouse, doorknob, dumb-bell, flashlight, head-phones, joy-stick, palm-pilot, video-projector, washing-machine.

1. Separate data sets

Classify the data set images according to the image content given by Test, Train, and Eval text

import os, random, shutil


def moveFile(fileDir, tarDir_train, tarDir_val, tarDir_test):
    with open(r"Desktop\shuju\Train.txt", encoding='utf-8') as a:
        train_image = []
        train_truth = []
        for i in a:
            train_image.append(i.split()[0])
            train_truth.append(i.split()[1])
        print(train_image)
        print(train_truth)
    with open(r"Desktop\shuju\Eval.txt", encoding='utf-8') as a:
        val_image = []
        val_truth = []
        for i in a:
            val_image.append(i.split()[0])
            val_truth.append(i.split()[1])
        print(val_image)
        print(val_truth)
    with open(r"Desktop\shuju\Test.txt", encoding='utf-8') as a:
        test_image = []
        test_image1 = []
        test_pre = []
        for i in a:
            test_image.append(i.split()[0])
            test_image1.append(i.split()[0].split('/')[1])
        print(test_image)
        print(test_image1)
    num=0
    for name in test_image:
        shutil.copy(fileDir + 'Images\' + name, tarDir_test + 'a' + str(num) + '.jpg')
        num + =1
    for name in train_image:
        shutil.move(fileDir + name, tarDir_train + name)
    for name in val_image:
        shutil.move(fileDir + name, tarDir_val + name)



if __name__ == '__main__':
    filePath = "Desktop\shuju\Images\" # Source image folder path
    train_fileDir = filePath + "train\"
    val_fileDir = filePath + "val\"
    test_fileDir = filePath + "test\"
    print(filePath)
    print(train_fileDir)
    print(val_fileDir, end="\\
\\
")
    pathls = []
    for name in os.listdir(filePath):
        # Determine whether the folder exists, create it if it does not exist
        pathls.append(name)
        if not os.path.exists(train_fileDir + name):
            os.makedirs(train_fileDir + name)
        if not os.path.exists(val_fileDir + name):
            os.makedirs(val_fileDir + name)
    if not os.path.exists(test_fileDir):
        os.makedirs(test_fileDir)

    moveFile(filePath, train_fileDir, val_fileDir,test_fileDir)
    # Delete the original folder (the folder should be empty at this time)
    for name in pathls:
        os.removedirs(filePath + name)

After the separation is complete

2. Define model ResNet34

import torch.nn as nn
import torch


class BasicBlock(nn.Module):
    expansion=1

    def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,
                               kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
        self.downsample = downsample

    def forward(self, x):
        identity=x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out + = identity
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_channel, out_channel, stride=1, downsample=None,
                 groups=1, width_per_group=64):
        super(Bottleneck, self).__init__()

        width = int(out_channel * (width_per_group / 64.)) * groups

        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width,
                               kernel_size=1, stride=1, bias=False) # squeeze channels
        self.bn1 = nn.BatchNorm2d(width)
        #------------------------------------------------
        self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,
                               kernel_size=3, stride=stride, bias=False, padding=1)
        self.bn2 = nn.BatchNorm2d(width)
        #------------------------------------------------
        self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel*self.expansion,
                               kernel_size=1, stride=1, bias=False) # unsqueeze channels
        self.bn3 = nn.BatchNorm2d(out_channel*self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity=x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        out + = identity
        out = self.relu(out)

        return out


class ResNet(nn.Module):

    def __init__(self,
                 block,
                 blocks_num,
                 num_classes=1000,
                 include_top=True,
                 groups=1,
                 width_per_group=64):
        super(ResNet, self).__init__()
        self.include_top = include_top
        self.in_channel = 64

        self.groups = groups
        self.width_per_group = width_per_group

        self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_channel)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, blocks_num[0])
        self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
        self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
        self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)
        if self.include_top:
            self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size = (1, 1)
            self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def _make_layer(self, block, channel, block_num, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channel * block.expansion))

        layers = []
        layers.append(block(self.in_channel,
                            channel,
                            downsample=downsample,
                            stride=stride,
                            groups=self.groups,
                            width_per_group=self.width_per_group))
        self.in_channel = channel * block.expansion

        for _ in range(1, block_num):
            layers.append(block(self.in_channel,
                                channel,
                                groups=self.groups,
                                width_per_group=self.width_per_group))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        if self.include_top:
            x = self.avgpool(x)
            x = torch.flatten(x, 1)
            x = self.fc(x)

        return x


def resnet34(num_classes=1000, include_top=True):
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)

3. Training

import os
importsys
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from tqdm import tqdm
from model import resnet34


def main():
    # Use CPU
    device = torch.device("cpu")
    print("using {} device.".format(device))

    # Image Processing
    data_transform = {
        "train": transforms.Compose([transforms.Resize(256),
                                     transforms.RandomResizedCrop(224),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
        "val": transforms.Compose([transforms.Resize(256),
                                   transforms.CenterCrop(224),
                                   transforms.ToTensor(),
                                   transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}

    image_path = os.path.join("Desktop\shuju\Images\") # flower data set path
    assert os.path.exists(image_path), "{} path does not exist.".format(image_path)

    train_dataset = datasets.ImageFolder(root=os.path.join(image_path, "train"),
                                         transform=data_transform["train"])
    train_num = len(train_dataset)
    flower_list = train_dataset.class_to_idx
    cla_dict = dict((val, key) for key, val in flower_list.items())
    json_str = json.dumps(cla_dict, indent=16) # Index
    with open('class_indices.json', 'w') as json_file:
        json_file.write(json_str)
    batch_size = 32
    nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
    print('Using {} dataloader workers every process'.format(nw))

    train_loader = torch.utils.data.DataLoader(train_dataset,
                                               batch_size=batch_size, shuffle=True,
                                               num_workers=nw)

    validate_dataset = datasets.ImageFolder(root=os.path.join(image_path, "val"),
                                            transform=data_transform["val"])

    val_num = len(validate_dataset)
    validate_loader = torch.utils.data.DataLoader(validate_dataset,
                                                  batch_size=batch_size, shuffle=False,
                                                  num_workers=nw)

    print("using {} images for training, {} images for validation.".format(train_num,
                                                                           val_num))

    net = resnet34()
    model_weight_path = r"ResNet\resnet34-333f7ec4.pth"
    assert os.path.exists(model_weight_path), "file {} does not exist.".format(model_weight_path)
    net.load_state_dict(torch.load(model_weight_path, map_location='cpu'))

  
    in_channel = net.fc.in_features
    net.fc = nn.Linear(in_channel, 16)
    net.to(device)

    loss_function = nn.CrossEntropyLoss()

    params = [p for p in net.parameters() if p.requires_grad]
    optimizer = optim.Adam(params, lr=0.00001)
    epochs = 8
    save_path = 'resNet34.pth'
    test_acc = 0.0
    train_steps = len(train_loader)
    for epoch in range(epochs):
        #train
        net.train()
        running_loss = 0.0
        train_bar = tqdm(train_loader, file=sys.stdout)
        for step, data in enumerate(train_bar):
            images, labels = data
            optimizer.zero_grad()
            logits = net(images.to(device))
            loss = loss_function(logits, labels.to(device))
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss + = loss.item()

            train_bar.desc = "train epoch[{}/{}] loss:{:.3f}".format(epoch + 1,
                                                                     epochs,
                                                                     loss)

        #validate
        net.eval()
        acc = 0.0 #accumulate accurate number / epoch
        with torch.no_grad():
            val_bar = tqdm(validate_loader, file=sys.stdout)
            for val_data in val_bar:
                val_images, val_labels = val_data
                outputs = net(val_images.to(device))
                predict_y = torch.max(outputs, dim=1)[1]
                acc + = torch.eq(predict_y, val_labels.to(device)).sum().item()

                val_bar.desc = "valid epoch[{}/{}]".format(epoch + 1,
                                                           epochs)

        val_accurate = acc / val_num
        print('[epoch %d] train_loss: %.3f val_accuracy: %.3f' %
              (epoch + 1, running_loss / train_steps, val_accurate))

        if val_accurate > test_acc:
            best_acc = val_accurate
            torch.save(net.state_dict(), save_path)

    print('Finished Training')


if __name__ == '__main__':
    main()

Accuracy after 8 rounds of training

4. Prediction

import os
import json
import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
from model import resnet34
plt.rcParams['font.family'] = 'SimHei' # Set the font to Chinese Songti or other installed Chinese fonts


def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    data_transform = transforms.Compose(
        [transforms.Resize(256),
         transforms.CenterCrop(224),
         transforms.ToTensor(),
         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
    # load image
    img_path = r"Desktop\shuju\Images\test\058_0046.jpg"
    img = Image.open(img_path)
    plt.imshow(img)
    # [N, C, H, W]
    img = data_transform(img)
    # expand batch dimension
    img = torch.unsqueeze(img, dim=0)

    # read class_indict
    json_path = './class_indices.json'

    with open(json_path, "r") as f:
        class_indict = json.load(f)
    # create model
    model = resnet34(num_classes=16).to(device)

    # load model weights
    weights_path = "./resNet34.pth"
    model.load_state_dict(torch.load(weights_path, map_location=device))

    # prediction
    model.eval()
    with torch.no_grad():
        # predict class
        output = torch.squeeze(model(img.to(device))).cpu()
        predict = torch.softmax(output, dim=0)
        predict_cla = torch.argmax(predict).numpy()

    print_res = "class: {} prob: {:.10f}".format(class_indict[str(predict_cla)],
                                                   predict[predict_cla].numpy())
    plt.title(print_res)
    for i in range(len(predict)):
        print("class: {:10} prob: {:.10f}".format(class_indict[str(i)],
                                                    predict[i].numpy()))
    plt.show()


if __name__ == '__main__':
    main()

Mouse prediction is correct