Task description
Based on the image classification of the Caltech101 data set, Caltech101 contains 101 categories of objects, each category has about 40 to 800 images, and 16 categories are selected. It is necessary to use an algorithm to identify which category the image belongs to based on the image characteristics.
Data description
The image data set used in the task contains 1567 pictures, divided into 16 categories, with more than 80 pictures in each category. The 16 categories are: ak47, binoculars, boom-box, calculator, cannon, computer-keyboard, computer-monitor, computer-mouse, doorknob, dumb-bell, flashlight, head-phones, joy-stick, palm-pilot, video-projector, washing-machine.
1. Separate data sets
Classify the data set images according to the image content given by Test, Train, and Eval text
import os, random, shutil def moveFile(fileDir, tarDir_train, tarDir_val, tarDir_test): with open(r"Desktop\shuju\Train.txt", encoding='utf-8') as a: train_image = [] train_truth = [] for i in a: train_image.append(i.split()[0]) train_truth.append(i.split()[1]) print(train_image) print(train_truth) with open(r"Desktop\shuju\Eval.txt", encoding='utf-8') as a: val_image = [] val_truth = [] for i in a: val_image.append(i.split()[0]) val_truth.append(i.split()[1]) print(val_image) print(val_truth) with open(r"Desktop\shuju\Test.txt", encoding='utf-8') as a: test_image = [] test_image1 = [] test_pre = [] for i in a: test_image.append(i.split()[0]) test_image1.append(i.split()[0].split('/')[1]) print(test_image) print(test_image1) num=0 for name in test_image: shutil.copy(fileDir + 'Images\' + name, tarDir_test + 'a' + str(num) + '.jpg') num + =1 for name in train_image: shutil.move(fileDir + name, tarDir_train + name) for name in val_image: shutil.move(fileDir + name, tarDir_val + name) if __name__ == '__main__': filePath = "Desktop\shuju\Images\" # Source image folder path train_fileDir = filePath + "train\" val_fileDir = filePath + "val\" test_fileDir = filePath + "test\" print(filePath) print(train_fileDir) print(val_fileDir, end="\\ \\ ") pathls = [] for name in os.listdir(filePath): # Determine whether the folder exists, create it if it does not exist pathls.append(name) if not os.path.exists(train_fileDir + name): os.makedirs(train_fileDir + name) if not os.path.exists(val_fileDir + name): os.makedirs(val_fileDir + name) if not os.path.exists(test_fileDir): os.makedirs(test_fileDir) moveFile(filePath, train_fileDir, val_fileDir,test_fileDir) # Delete the original folder (the folder should be empty at this time) for name in pathls: os.removedirs(filePath + name)
After the separation is complete
2. Define model ResNet34
import torch.nn as nn import torch class BasicBlock(nn.Module): expansion=1 def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(out_channel) self.relu = nn.ReLU() self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(out_channel) self.downsample = downsample def forward(self, x): identity=x if self.downsample is not None: identity = self.downsample(x) out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out + = identity out = self.relu(out) return out class Bottleneck(nn.Module): expansion = 4 def __init__(self, in_channel, out_channel, stride=1, downsample=None, groups=1, width_per_group=64): super(Bottleneck, self).__init__() width = int(out_channel * (width_per_group / 64.)) * groups self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width, kernel_size=1, stride=1, bias=False) # squeeze channels self.bn1 = nn.BatchNorm2d(width) #------------------------------------------------ self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups, kernel_size=3, stride=stride, bias=False, padding=1) self.bn2 = nn.BatchNorm2d(width) #------------------------------------------------ self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel*self.expansion, kernel_size=1, stride=1, bias=False) # unsqueeze channels self.bn3 = nn.BatchNorm2d(out_channel*self.expansion) self.relu = nn.ReLU(inplace=True) self.downsample = downsample def forward(self, x): identity=x if self.downsample is not None: identity = self.downsample(x) out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.relu(out) out = self.conv3(out) out = self.bn3(out) out + = identity out = self.relu(out) return out class ResNet(nn.Module): def __init__(self, block, blocks_num, num_classes=1000, include_top=True, groups=1, width_per_group=64): super(ResNet, self).__init__() self.include_top = include_top self.in_channel = 64 self.groups = groups self.width_per_group = width_per_group self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2, padding=3, bias=False) self.bn1 = nn.BatchNorm2d(self.in_channel) self.relu = nn.ReLU(inplace=True) self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) self.layer1 = self._make_layer(block, 64, blocks_num[0]) self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2) self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2) self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2) if self.include_top: self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size = (1, 1) self.fc = nn.Linear(512 * block.expansion, num_classes) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') def _make_layer(self, block, channel, block_num, stride=1): downsample = None if stride != 1 or self.in_channel != channel * block.expansion: downsample = nn.Sequential( nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(channel * block.expansion)) layers = [] layers.append(block(self.in_channel, channel, downsample=downsample, stride=stride, groups=self.groups, width_per_group=self.width_per_group)) self.in_channel = channel * block.expansion for _ in range(1, block_num): layers.append(block(self.in_channel, channel, groups=self.groups, width_per_group=self.width_per_group)) return nn.Sequential(*layers) def forward(self, x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) if self.include_top: x = self.avgpool(x) x = torch.flatten(x, 1) x = self.fc(x) return x def resnet34(num_classes=1000, include_top=True): return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)
3. Training
import os importsys import json import torch import torch.nn as nn import torch.optim as optim from torchvision import transforms, datasets from tqdm import tqdm from model import resnet34 def main(): # Use CPU device = torch.device("cpu") print("using {} device.".format(device)) # Image Processing data_transform = { "train": transforms.Compose([transforms.Resize(256), transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]), "val": transforms.Compose([transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])} image_path = os.path.join("Desktop\shuju\Images\") # flower data set path assert os.path.exists(image_path), "{} path does not exist.".format(image_path) train_dataset = datasets.ImageFolder(root=os.path.join(image_path, "train"), transform=data_transform["train"]) train_num = len(train_dataset) flower_list = train_dataset.class_to_idx cla_dict = dict((val, key) for key, val in flower_list.items()) json_str = json.dumps(cla_dict, indent=16) # Index with open('class_indices.json', 'w') as json_file: json_file.write(json_str) batch_size = 32 nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers print('Using {} dataloader workers every process'.format(nw)) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=nw) validate_dataset = datasets.ImageFolder(root=os.path.join(image_path, "val"), transform=data_transform["val"]) val_num = len(validate_dataset) validate_loader = torch.utils.data.DataLoader(validate_dataset, batch_size=batch_size, shuffle=False, num_workers=nw) print("using {} images for training, {} images for validation.".format(train_num, val_num)) net = resnet34() model_weight_path = r"ResNet\resnet34-333f7ec4.pth" assert os.path.exists(model_weight_path), "file {} does not exist.".format(model_weight_path) net.load_state_dict(torch.load(model_weight_path, map_location='cpu')) in_channel = net.fc.in_features net.fc = nn.Linear(in_channel, 16) net.to(device) loss_function = nn.CrossEntropyLoss() params = [p for p in net.parameters() if p.requires_grad] optimizer = optim.Adam(params, lr=0.00001) epochs = 8 save_path = 'resNet34.pth' test_acc = 0.0 train_steps = len(train_loader) for epoch in range(epochs): #train net.train() running_loss = 0.0 train_bar = tqdm(train_loader, file=sys.stdout) for step, data in enumerate(train_bar): images, labels = data optimizer.zero_grad() logits = net(images.to(device)) loss = loss_function(logits, labels.to(device)) loss.backward() optimizer.step() # print statistics running_loss + = loss.item() train_bar.desc = "train epoch[{}/{}] loss:{:.3f}".format(epoch + 1, epochs, loss) #validate net.eval() acc = 0.0 #accumulate accurate number / epoch with torch.no_grad(): val_bar = tqdm(validate_loader, file=sys.stdout) for val_data in val_bar: val_images, val_labels = val_data outputs = net(val_images.to(device)) predict_y = torch.max(outputs, dim=1)[1] acc + = torch.eq(predict_y, val_labels.to(device)).sum().item() val_bar.desc = "valid epoch[{}/{}]".format(epoch + 1, epochs) val_accurate = acc / val_num print('[epoch %d] train_loss: %.3f val_accuracy: %.3f' % (epoch + 1, running_loss / train_steps, val_accurate)) if val_accurate > test_acc: best_acc = val_accurate torch.save(net.state_dict(), save_path) print('Finished Training') if __name__ == '__main__': main()
Accuracy after 8 rounds of training
4. Prediction
import os import json import torch from PIL import Image from torchvision import transforms import matplotlib.pyplot as plt from model import resnet34 plt.rcParams['font.family'] = 'SimHei' # Set the font to Chinese Songti or other installed Chinese fonts def main(): device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") data_transform = transforms.Compose( [transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]) # load image img_path = r"Desktop\shuju\Images\test\058_0046.jpg" img = Image.open(img_path) plt.imshow(img) # [N, C, H, W] img = data_transform(img) # expand batch dimension img = torch.unsqueeze(img, dim=0) # read class_indict json_path = './class_indices.json' with open(json_path, "r") as f: class_indict = json.load(f) # create model model = resnet34(num_classes=16).to(device) # load model weights weights_path = "./resNet34.pth" model.load_state_dict(torch.load(weights_path, map_location=device)) # prediction model.eval() with torch.no_grad(): # predict class output = torch.squeeze(model(img.to(device))).cpu() predict = torch.softmax(output, dim=0) predict_cla = torch.argmax(predict).numpy() print_res = "class: {} prob: {:.10f}".format(class_indict[str(predict_cla)], predict[predict_cla].numpy()) plt.title(print_res) for i in range(len(predict)): print("class: {:10} prob: {:.10f}".format(class_indict[str(i)], predict[i].numpy())) plt.show() if __name__ == '__main__': main()
Mouse prediction is correct