vgg16利用相似性损失函数进行猫狗图像分类

1.dataset

import random

from PIL import Image
from torchvision import datasets, transforms
from torchvision.transforms import functional_pil as F_pil
from torchvision.transforms import functional as F
from torch.utils.data import DataLoader
import torch.nn as nn

IMG_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif', '.tiff', '.webp')

noinspection PyTypeChecker
class MergeTwoImageTransformer(nn.Module):
    def __init__(self, image_paths):
        super(MergeTwoImageTransformer, self).__init__()
        self.image_paths = image_paths

    def forward(self, img):
        # noinspection PyProtectedMember
        if not F_pil._is_pil_image(img):
            raise ValueError("图像合并暂时仅支持Pillow对象操作")
        img_w, img_h = F._get_image_size(img)
        # 1. 随机选择一个图像的路径
        img_path = random.choice(self.image_paths)
        # 2. 图像加载
        other_img = datasets.folder.pil_loader(img_path)
        # 图像剪切 + resize
        i, j, h, w = transforms.RandomResizedCrop.get_params(other_img, scale=(0.5, 1.0), ratio=(3. / 4., 4. / 3.))
        other_img = F.resized_crop(other_img, i, j, h, w, (img_h, img_w), transforms.InterpolationMode.BILINEAR)
        # 3. 图像合并(水印的方式)
        img = Image.blend(img, other_img, 0.15)
        return img

class DataSet:
    def __init__(self, root_dir, batch_size=8, num_workers=0, train=True, shuffle=None):
        if shuffle is None:
            shuffle = train
        self.root_dir = root_dir
        # 加载图像路径
        _, class_to_idx = datasets.folder.find_classes(self.root_dir)
        image_paths = datasets.ImageFolder.make_dataset(
            self.root_dir,
            class_to_idx,
            IMG_EXTENSIONS
        )
        self.image_paths = [s[0] for s in image_paths]
        # 构建transform
        transform = self.get_train_transform() if train else self.get_valid_transform()
        # 数据构建
        self.dataset = datasets.ImageFolder(
            root=self.root_dir,
            transform=transform,
        )
        self.loader = DataLoader(
            dataset=self.dataset,
            batch_size=batch_size,
            shuffle=shuffle,
            num_workers=num_workers,
            prefetch_factor=2 if num_workers == 0 else batch_size * num_workers
        )

    def __len__(self):
        return len(self.dataset.imgs)

    def __iter__(self):
        for data in self.loader:
            yield data

    def get_train_transform(self):
"""
        提取训练数据上的特征转换器
        -0. 随机抽取一张图像当作噪声数据和当前数据合并;
        -1. 随机的水平交换;
        -2. 随机的剪切+Resize
        -3. 随机增强
        -4. 转换成Tensor对象
        -5. 标准化的处理
        :return:
"""
        return transforms.Compose([
            MergeTwoImageTransformer(self.image_paths),
            transforms.RandomHorizontalFlip(p=0.4),
            transforms.RandomResizedCrop(size=224, scale=(0.6, 1.0)),
            transforms.ColorJitter(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 自己在当前数据集上计算
            # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet中的默认均值和标准差
        ])

    @staticmethod
    def get_valid_transform(online=False):
"""
        提取测试校验集上的特征转换器
        :param online: 是否是在线单样本的预测
        :return:
"""
        if online:
            return transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 自己在当前数据集上计算
            ])
        else:
            return transforms.Compose([
                transforms.RandomResizedCrop(size=224, scale=(0.95, 1.0)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 自己在当前数据集上计算
            ])

2.metric

import torch
import torch.nn as nn

class AccuracyScore(nn.Module):
    def __init__(self):
        super(AccuracyScore, self).__init__()

    # noinspection PyMethodMayBeStatic
    def forward(self, y_pred, y_true):
        y_pred_dim = y_pred.dim()
        y_true_dim = y_true.dim()
        if y_pred_dim == y_true_dim:
            pass
        elif y_pred_dim == y_true_dim + 1:
            y_pred = torch.argmax(y_pred, dim=1)
        else:
            raise ValueError("格式异常!")
        y_pred = y_pred.to(y_true.dtype)
        correct = (y_pred == y_true)
        # noinspection PyUnresolvedReferences
        return torch.mean(correct.to(torch.float32))

3.network

import copy

import torch.nn as nn
from torchvision import models

class NetworkV1(nn.Module):
    def __init__(self, num_classes=2, **kwargs):
        super(NetworkV1, self).__init__()
        self.batch_sim_vectors = None
        self.handle = None
        vgg16 = models.vgg16_bn(pretrained=True)
        # 迁移学习:用在其它业务场景训练好的模型迁到到当前业务场景中,但是将公用的参数共享
        # 第一种实现方式:将训练好的模型参数当作当前业务中模型参数的初始值,也就是所有参数照常更新
        # 第二种实现方式:将部分基础参数冻结(不参数反向传播,参数值不更新),在当前业务中的模型里面,仅更新部分参数
        for param in vgg16.parameters():
            param.requires_grad = False
        # 将最后两层的全连接的神经元数目进行调整(和业务场景有关)
        vgg16.classifier[3] = nn.Linear(in_features=4096, out_features=64)
        vgg16.classifier[6] = nn.Linear(in_features=64, out_features=num_classes)
        self.vgg16 = vgg16
        self.add_hook_fn()

    def forward(self, x):
        z = self.vgg16(x)
        return z, self.batch_sim_vectors

    def create_sim_hook_fn(self):
        def hook_fn(_model, _input, _output):
            self.batch_sim_vectors = _output

        return hook_fn

    def add_hook_fn(self):
"""
        添加钩子
        :return:
"""
        self.remove_hook_fn()
        self.handle = self.vgg16.classifier[5].register_forward_hook(self.create_sim_hook_fn())

    def remove_hook_fn(self):
"""
        调用持久化之前,需要删除操作
        :return:
"""
        if self.handle is None:
            return
        self.handle.remove()

class NetworkV2(nn.Module):
    def __init__(self, num_classes=2, **kwargs):
        super(NetworkV2, self).__init__()
        vgg16 = models.vgg16_bn(pretrained=True)
        # 迁移学习:用在其它业务场景训练好的模型迁到到当前业务场景中,但是将公用的参数共享
        # 第一种实现方式:将训练好的模型参数当作当前业务中模型参数的初始值,也就是所有参数照常更新
        # 第二种实现方式:将部分基础参数冻结(不参数反向传播,参数值不更新),在当前业务中的模型里面,仅更新部分参数
        for param in vgg16.parameters():
            param.requires_grad = False
        # 将最后两层的全连接的神经元数目进行调整(和业务场景有关)
        vgg16.classifier[3] = nn.Linear(in_features=4096, out_features=64)
        vgg16.classifier[6] = nn.Linear(in_features=64, out_features=num_classes)

        # 提取各个模块
        self.features = copy.deepcopy(vgg16.features)
        self.avgpool = copy.deepcopy(vgg16.avgpool)
        self.classifier = copy.deepcopy(vgg16.classifier)
        self.simer = nn.Sequential(
            self.classifier[0],
            self.classifier[1],
            self.classifier[3],
            self.classifier[4]
        )
        del vgg16

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = x.view(-1, 512 * 7 * 7)
        return self.classifier(x), self.simer(x)

    def add_hook_fn(self):
        pass

    def remove_hook_fn(self):
        pass

Network = NetworkV1

4.model

import os

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
noinspection PyPep8Naming
import torchvision.transforms.functional_pil as F_pil

from . import dataset
from .metric import AccuracyScore
from .network import Network

# 如果存在多个GPU,指定某个GPU(通过nvidia-smi查看)
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '1'

class ImageClassifiModel(object):
    def __init__(self, num_classes=2, batch_size=16, num_workers=0, model_dir=None, summary_dir=None, lr=0.005,
                 momentum=0.5, weight_decay=0.05, nesterov=True, use_gpu=False, class_names=None, is_online=False):
        super(ImageClassifiModel, self).__init__()
        if class_names is None:
            class_names = ['猫', '狗']
        self.model_dir = model_dir
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.class_names = np.asarray(class_names)
        self.device = torch.device('cuda' if use_gpu and torch.cuda.is_available() else 'cpu')

        if is_online:
            _models = list(os.listdir(self.model_dir))
            _models.sort()
            self.net = torch.load(os.path.join(self.model_dir, _models[-1]))
            self.net.eval()
            self.net.add_hook_fn()
            self.transform = dataset.DataSet.get_valid_transform(online=True)
        else:
            # 结构构建
            self.net = Network(num_classes=num_classes)
            self.net.to(device=self.device)
            self.loss_fn = nn.CrossEntropyLoss()
            self.sim_loss_fn = nn.CosineEmbeddingLoss()
            self.metrics = {
                'acc': AccuracyScore()
            }
            self.train_optim = optim.SGD(params=[p for p in self.net.parameters() if p.requires_grad],
                                         lr=0.005, momentum=momentum,
                                         weight_decay=weight_decay,
                                         nesterov=nesterov
                                         )
            # 图像可视化一下
            if not os.path.exists(summary_dir):
                os.makedirs(summary_dir)
            writer = SummaryWriter(log_dir=summary_dir)
            writer.add_graph(self.net, torch.empty(self.batch_size, 3, 224, 224))
            writer.close()
            self.summary_dir = summary_dir

    def training(self, train_data_dir, test_data_dir, total_epoch, summary_step_interval=200, eval_epoch_interval=1,
                 save_epoch_interval=10):
        # 1. 数据加载
        trainset = dataset.DataSet(
            root_dir=train_data_dir,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            train=True,
            shuffle=True
        )
        testset = dataset.DataSet(
            root_dir=test_data_dir,
            batch_size=self.batch_size,
            num_workers=0,
            train=False,
            shuffle=False
        )

        # 当前批次中,样本与样本对的组合
        def _get_pairs(_size):
            pairs = []
            for i in range(_size):
                for j in range(i, _size):
                    pairs.append([i, j])
            pairs = np.asarray(pairs)
            return pairs

        # 2. summary输出
        writer = SummaryWriter(log_dir=os.path.join(self.summary_dir, "training"))

        # 3. 开始模型训练
        train_step = 0
        test_step = 0
        _p = 0.85
        for epoch in range(total_epoch):
            # 训练操作
            self.net.train(True)
            train_loss = []
            for data in trainset:
                inputs, labels = data
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)

                # 前向过程
                outputs, vectors = self.net(inputs)
                _pairs = _get_pairs(vectors.shape[0])
                _loss, _ce_loss, _sim_loss = self._calc_loss(outputs, vectors, labels, _pairs, _p)
                _metrics = {}
                for _key in self.metrics:
                    _metrics[_key] = self.metrics[_key](outputs, labels).cpu().numpy()

                # 反向过程
                self.train_optim.zero_grad()
                _loss.backward()
                self.train_optim.step()

                train_loss.append(_loss.item())
                if train_step % summary_step_interval == 0:
                    # 可视化输出
                    writer.add_scalar('train_loss', _loss, train_step)
                    writer.add_scalar('train_ce_loss', _ce_loss, train_step)
                    writer.add_scalar('train_sim_loss', _sim_loss, train_step)
                    writer.add_scalars('train_metrics', _metrics, train_step)
                    print(f"Train {epoch + 1}/{total_epoch} {train_step} "
                          f"loss:{_loss.item():.3f} accuracy:{_metrics.get('acc', -0.0):.3f}")
                train_step += 1

            # 测试操作
            if epoch % eval_epoch_interval == 0:
                self.net.eval()
                test_loss = []
                for data in testset:
                    inputs, labels = data
                    inputs = inputs.to(self.device)
                    labels = labels.to(self.device)

                    # 前向过程
                    outputs, vectors = self.net(inputs)
                    _pairs = _get_pairs(vectors.shape[0])
                    _loss, _ce_loss, _sim_loss = self._calc_loss(outputs, vectors, labels, _pairs, _p)
                    _metrics = {}
                    for _key in self.metrics:
                        _metrics[_key] = self.metrics[_key](outputs, labels).cpu().numpy()

                    test_loss.append(_loss.item())
                    if test_step % summary_step_interval == 0:
                        # 可视化输出
                        writer.add_scalar('test_loss', _loss, test_step)
                        writer.add_scalar('test_ce_loss', _ce_loss, test_step)
                        writer.add_scalar('test_sim_loss', _sim_loss, test_step)
                        writer.add_scalars('test_metrics', _metrics, test_step)
                        print(f"Test {epoch + 1}/{total_epoch} {test_step} "
                              f"loss:{_loss.item():.3f} accuracy:{_metrics.get('acc', -0.0):.3f}")
                    test_step += 1

                # 每个epoch计算损失
                writer.add_scalars('epoch_loss', {'train': np.mean(train_loss), 'test': np.mean(test_loss)}, epoch)
            else:
                # 每个epoch计算损失
                writer.add_scalars('epoch_loss', {'train': np.mean(train_loss)}, epoch)

            # 保存
            if epoch % save_epoch_interval == 0:
                # 保存整个网络结构+参数
                self.net.remove_hook_fn()
                model_path = os.path.join(self.model_dir, f'{epoch:04d}_model.pt')
                if not os.path.exists(os.path.dirname(model_path)):
                    os.makedirs(os.path.dirname(model_path))
                torch.save(self.net, model_path)
                # torch.save(self.net.state_dict(), model_path)
                self.net.add_hook_fn()

        # 4. 模型保存
        self.net.remove_hook_fn()
        model_path = os.path.join(self.model_dir, f'{total_epoch:04d}_model.pt')
        if not os.path.exists(os.path.dirname(model_path)):
            os.makedirs(os.path.dirname(model_path))
        # torch.save(self.net.state_dict(), model_path)
        torch.save(self.net, model_path)

        # 5. 关闭相关的资源
        writer.close()

    def eval(self):
        # TODO: 实际上就是训练过程中的评估指标单独在这里实现
        pass

    @torch.no_grad()
    def predict(self, img):
        # noinspection PyProtectedMember
        if not F_pil._is_pil_image(img):
            raise ValueError("仅支持PIL图像对象!")
        # 特征处理&转换
        img = self.transform(img)
        img = img[None, :, :, :]  # [C,H,W] --> [1,C,H,W]

        # 模型预测
        y_, v_ = self.net(img)
        # 结果转换输出
        return self.class_names[torch.argmax(y_, dim=-1)], v_.detach().cpu().numpy()[0]

    def _calc_loss(self, outputs, vectors, labels, pairs, p):
        input1 = vectors[pairs[:, 0]]
        input2 = vectors[pairs[:, 1]]
        # noinspection PyUnresolvedReferences
        target = 2 * ((labels[pairs[:, 0]] == labels[pairs[:, 1]]).to(torch.int)) - 1
        _ce_loss = self.loss_fn(outputs, labels)
        _sim_loss = self.sim_loss_fn(input1, input2, target)
        _loss = p * _ce_loss + (1-p)* _sim_loss
        # _loss = _ce_loss
        return _loss, _ce_loss, _sim_loss

5.测试

from image_classifi.src.image_classifi_model.model import ImageClassifiModel
import torch
from PIL import Image
from torch import nn
import os
import numpy as np
root_dir=r'D:\pythonProject\image_classifi'
m=ImageClassifiModel(
    model_dir=os.path.join(root_dir,'output03','model_cpu'),
    class_names=['cat','dog'],
    is_online=True
)
root_dir=r'D:\pythonProject\dogcat\test'
img=Image.open('IMG_2770(20220810-144822).JPG')
r1,r2=m.predict(img)
print(r1)
print(r2)

img=Image.open('IMG_2768(20220810-143936).JPG')
r3,r4=m.predict(img)
print(r3)
print(r4)

print(np.sum(r2*r4)/(np.sqrt(np.sum(r2**2))*np.sqrt(np.sum(r4**2))))#计算余弦相似度

训练main:

from image_classifi_model import model
import os
root_dir = r'D:\pythonProject\image_classifi'
m = model.ImageClassifiModel(
    num_classes=2,
    batch_size=16,
    num_workers=0,
    model_dir=os.path.join(root_dir, 'output03', 'model_cpu'),
    summary_dir=os.path.join(root_dir, 'output03', 'summary'),
    lr=0.005,
    momentum=0.5,
    weight_decay=0.05,
    nesterov=True,
    use_gpu=False,
    class_names=['猫','狗']
)

root_dir = r'D:\pythonProject\dogcat'
m.training(
    train_data_dir=os.path.join(root_dir, 'train'),
    test_data_dir=os.path.join(root_dir, 'test'),
    total_epoch=10,
    summary_step_interval=1,
    eval_epoch_interval=2,
    save_epoch_interval=1,
)

Original: https://blog.csdn.net/weixin_42804612/article/details/126267178
Author: 童星萌宝屋
Title: vgg16利用相似性损失函数进行猫狗图像分类

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/667333/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球