1.dataset
import random
from PIL import Image
from torchvision import datasets, transforms
from torchvision.transforms import functional_pil as F_pil
from torchvision.transforms import functional as F
from torch.utils.data import DataLoader
import torch.nn as nn
IMG_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif', '.tiff', '.webp')
noinspection PyTypeChecker
class MergeTwoImageTransformer(nn.Module):
def __init__(self, image_paths):
super(MergeTwoImageTransformer, self).__init__()
self.image_paths = image_paths
def forward(self, img):
# noinspection PyProtectedMember
if not F_pil._is_pil_image(img):
raise ValueError("图像合并暂时仅支持Pillow对象操作")
img_w, img_h = F._get_image_size(img)
# 1. 随机选择一个图像的路径
img_path = random.choice(self.image_paths)
# 2. 图像加载
other_img = datasets.folder.pil_loader(img_path)
# 图像剪切 + resize
i, j, h, w = transforms.RandomResizedCrop.get_params(other_img, scale=(0.5, 1.0), ratio=(3. / 4., 4. / 3.))
other_img = F.resized_crop(other_img, i, j, h, w, (img_h, img_w), transforms.InterpolationMode.BILINEAR)
# 3. 图像合并(水印的方式)
img = Image.blend(img, other_img, 0.15)
return img
class DataSet:
def __init__(self, root_dir, batch_size=8, num_workers=0, train=True, shuffle=None):
if shuffle is None:
shuffle = train
self.root_dir = root_dir
# 加载图像路径
_, class_to_idx = datasets.folder.find_classes(self.root_dir)
image_paths = datasets.ImageFolder.make_dataset(
self.root_dir,
class_to_idx,
IMG_EXTENSIONS
)
self.image_paths = [s[0] for s in image_paths]
# 构建transform
transform = self.get_train_transform() if train else self.get_valid_transform()
# 数据构建
self.dataset = datasets.ImageFolder(
root=self.root_dir,
transform=transform,
)
self.loader = DataLoader(
dataset=self.dataset,
batch_size=batch_size,
shuffle=shuffle,
num_workers=num_workers,
prefetch_factor=2 if num_workers == 0 else batch_size * num_workers
)
def __len__(self):
return len(self.dataset.imgs)
def __iter__(self):
for data in self.loader:
yield data
def get_train_transform(self):
"""
提取训练数据上的特征转换器
-0. 随机抽取一张图像当作噪声数据和当前数据合并;
-1. 随机的水平交换;
-2. 随机的剪切+Resize
-3. 随机增强
-4. 转换成Tensor对象
-5. 标准化的处理
:return:
"""
return transforms.Compose([
MergeTwoImageTransformer(self.image_paths),
transforms.RandomHorizontalFlip(p=0.4),
transforms.RandomResizedCrop(size=224, scale=(0.6, 1.0)),
transforms.ColorJitter(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 自己在当前数据集上计算
# transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet中的默认均值和标准差
])
@staticmethod
def get_valid_transform(online=False):
"""
提取测试校验集上的特征转换器
:param online: 是否是在线单样本的预测
:return:
"""
if online:
return transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 自己在当前数据集上计算
])
else:
return transforms.Compose([
transforms.RandomResizedCrop(size=224, scale=(0.95, 1.0)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 自己在当前数据集上计算
])
2.metric
import torch
import torch.nn as nn
class AccuracyScore(nn.Module):
def __init__(self):
super(AccuracyScore, self).__init__()
# noinspection PyMethodMayBeStatic
def forward(self, y_pred, y_true):
y_pred_dim = y_pred.dim()
y_true_dim = y_true.dim()
if y_pred_dim == y_true_dim:
pass
elif y_pred_dim == y_true_dim + 1:
y_pred = torch.argmax(y_pred, dim=1)
else:
raise ValueError("格式异常!")
y_pred = y_pred.to(y_true.dtype)
correct = (y_pred == y_true)
# noinspection PyUnresolvedReferences
return torch.mean(correct.to(torch.float32))
3.network
import copy
import torch.nn as nn
from torchvision import models
class NetworkV1(nn.Module):
def __init__(self, num_classes=2, **kwargs):
super(NetworkV1, self).__init__()
self.batch_sim_vectors = None
self.handle = None
vgg16 = models.vgg16_bn(pretrained=True)
# 迁移学习:用在其它业务场景训练好的模型迁到到当前业务场景中,但是将公用的参数共享
# 第一种实现方式:将训练好的模型参数当作当前业务中模型参数的初始值,也就是所有参数照常更新
# 第二种实现方式:将部分基础参数冻结(不参数反向传播,参数值不更新),在当前业务中的模型里面,仅更新部分参数
for param in vgg16.parameters():
param.requires_grad = False
# 将最后两层的全连接的神经元数目进行调整(和业务场景有关)
vgg16.classifier[3] = nn.Linear(in_features=4096, out_features=64)
vgg16.classifier[6] = nn.Linear(in_features=64, out_features=num_classes)
self.vgg16 = vgg16
self.add_hook_fn()
def forward(self, x):
z = self.vgg16(x)
return z, self.batch_sim_vectors
def create_sim_hook_fn(self):
def hook_fn(_model, _input, _output):
self.batch_sim_vectors = _output
return hook_fn
def add_hook_fn(self):
"""
添加钩子
:return:
"""
self.remove_hook_fn()
self.handle = self.vgg16.classifier[5].register_forward_hook(self.create_sim_hook_fn())
def remove_hook_fn(self):
"""
调用持久化之前,需要删除操作
:return:
"""
if self.handle is None:
return
self.handle.remove()
class NetworkV2(nn.Module):
def __init__(self, num_classes=2, **kwargs):
super(NetworkV2, self).__init__()
vgg16 = models.vgg16_bn(pretrained=True)
# 迁移学习:用在其它业务场景训练好的模型迁到到当前业务场景中,但是将公用的参数共享
# 第一种实现方式:将训练好的模型参数当作当前业务中模型参数的初始值,也就是所有参数照常更新
# 第二种实现方式:将部分基础参数冻结(不参数反向传播,参数值不更新),在当前业务中的模型里面,仅更新部分参数
for param in vgg16.parameters():
param.requires_grad = False
# 将最后两层的全连接的神经元数目进行调整(和业务场景有关)
vgg16.classifier[3] = nn.Linear(in_features=4096, out_features=64)
vgg16.classifier[6] = nn.Linear(in_features=64, out_features=num_classes)
# 提取各个模块
self.features = copy.deepcopy(vgg16.features)
self.avgpool = copy.deepcopy(vgg16.avgpool)
self.classifier = copy.deepcopy(vgg16.classifier)
self.simer = nn.Sequential(
self.classifier[0],
self.classifier[1],
self.classifier[3],
self.classifier[4]
)
del vgg16
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = x.view(-1, 512 * 7 * 7)
return self.classifier(x), self.simer(x)
def add_hook_fn(self):
pass
def remove_hook_fn(self):
pass
Network = NetworkV1
4.model
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
noinspection PyPep8Naming
import torchvision.transforms.functional_pil as F_pil
from . import dataset
from .metric import AccuracyScore
from .network import Network
# 如果存在多个GPU,指定某个GPU(通过nvidia-smi查看)
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
class ImageClassifiModel(object):
def __init__(self, num_classes=2, batch_size=16, num_workers=0, model_dir=None, summary_dir=None, lr=0.005,
momentum=0.5, weight_decay=0.05, nesterov=True, use_gpu=False, class_names=None, is_online=False):
super(ImageClassifiModel, self).__init__()
if class_names is None:
class_names = ['猫', '狗']
self.model_dir = model_dir
self.batch_size = batch_size
self.num_workers = num_workers
self.class_names = np.asarray(class_names)
self.device = torch.device('cuda' if use_gpu and torch.cuda.is_available() else 'cpu')
if is_online:
_models = list(os.listdir(self.model_dir))
_models.sort()
self.net = torch.load(os.path.join(self.model_dir, _models[-1]))
self.net.eval()
self.net.add_hook_fn()
self.transform = dataset.DataSet.get_valid_transform(online=True)
else:
# 结构构建
self.net = Network(num_classes=num_classes)
self.net.to(device=self.device)
self.loss_fn = nn.CrossEntropyLoss()
self.sim_loss_fn = nn.CosineEmbeddingLoss()
self.metrics = {
'acc': AccuracyScore()
}
self.train_optim = optim.SGD(params=[p for p in self.net.parameters() if p.requires_grad],
lr=0.005, momentum=momentum,
weight_decay=weight_decay,
nesterov=nesterov
)
# 图像可视化一下
if not os.path.exists(summary_dir):
os.makedirs(summary_dir)
writer = SummaryWriter(log_dir=summary_dir)
writer.add_graph(self.net, torch.empty(self.batch_size, 3, 224, 224))
writer.close()
self.summary_dir = summary_dir
def training(self, train_data_dir, test_data_dir, total_epoch, summary_step_interval=200, eval_epoch_interval=1,
save_epoch_interval=10):
# 1. 数据加载
trainset = dataset.DataSet(
root_dir=train_data_dir,
batch_size=self.batch_size,
num_workers=self.num_workers,
train=True,
shuffle=True
)
testset = dataset.DataSet(
root_dir=test_data_dir,
batch_size=self.batch_size,
num_workers=0,
train=False,
shuffle=False
)
# 当前批次中,样本与样本对的组合
def _get_pairs(_size):
pairs = []
for i in range(_size):
for j in range(i, _size):
pairs.append([i, j])
pairs = np.asarray(pairs)
return pairs
# 2. summary输出
writer = SummaryWriter(log_dir=os.path.join(self.summary_dir, "training"))
# 3. 开始模型训练
train_step = 0
test_step = 0
_p = 0.85
for epoch in range(total_epoch):
# 训练操作
self.net.train(True)
train_loss = []
for data in trainset:
inputs, labels = data
inputs = inputs.to(self.device)
labels = labels.to(self.device)
# 前向过程
outputs, vectors = self.net(inputs)
_pairs = _get_pairs(vectors.shape[0])
_loss, _ce_loss, _sim_loss = self._calc_loss(outputs, vectors, labels, _pairs, _p)
_metrics = {}
for _key in self.metrics:
_metrics[_key] = self.metrics[_key](outputs, labels).cpu().numpy()
# 反向过程
self.train_optim.zero_grad()
_loss.backward()
self.train_optim.step()
train_loss.append(_loss.item())
if train_step % summary_step_interval == 0:
# 可视化输出
writer.add_scalar('train_loss', _loss, train_step)
writer.add_scalar('train_ce_loss', _ce_loss, train_step)
writer.add_scalar('train_sim_loss', _sim_loss, train_step)
writer.add_scalars('train_metrics', _metrics, train_step)
print(f"Train {epoch + 1}/{total_epoch} {train_step} "
f"loss:{_loss.item():.3f} accuracy:{_metrics.get('acc', -0.0):.3f}")
train_step += 1
# 测试操作
if epoch % eval_epoch_interval == 0:
self.net.eval()
test_loss = []
for data in testset:
inputs, labels = data
inputs = inputs.to(self.device)
labels = labels.to(self.device)
# 前向过程
outputs, vectors = self.net(inputs)
_pairs = _get_pairs(vectors.shape[0])
_loss, _ce_loss, _sim_loss = self._calc_loss(outputs, vectors, labels, _pairs, _p)
_metrics = {}
for _key in self.metrics:
_metrics[_key] = self.metrics[_key](outputs, labels).cpu().numpy()
test_loss.append(_loss.item())
if test_step % summary_step_interval == 0:
# 可视化输出
writer.add_scalar('test_loss', _loss, test_step)
writer.add_scalar('test_ce_loss', _ce_loss, test_step)
writer.add_scalar('test_sim_loss', _sim_loss, test_step)
writer.add_scalars('test_metrics', _metrics, test_step)
print(f"Test {epoch + 1}/{total_epoch} {test_step} "
f"loss:{_loss.item():.3f} accuracy:{_metrics.get('acc', -0.0):.3f}")
test_step += 1
# 每个epoch计算损失
writer.add_scalars('epoch_loss', {'train': np.mean(train_loss), 'test': np.mean(test_loss)}, epoch)
else:
# 每个epoch计算损失
writer.add_scalars('epoch_loss', {'train': np.mean(train_loss)}, epoch)
# 保存
if epoch % save_epoch_interval == 0:
# 保存整个网络结构+参数
self.net.remove_hook_fn()
model_path = os.path.join(self.model_dir, f'{epoch:04d}_model.pt')
if not os.path.exists(os.path.dirname(model_path)):
os.makedirs(os.path.dirname(model_path))
torch.save(self.net, model_path)
# torch.save(self.net.state_dict(), model_path)
self.net.add_hook_fn()
# 4. 模型保存
self.net.remove_hook_fn()
model_path = os.path.join(self.model_dir, f'{total_epoch:04d}_model.pt')
if not os.path.exists(os.path.dirname(model_path)):
os.makedirs(os.path.dirname(model_path))
# torch.save(self.net.state_dict(), model_path)
torch.save(self.net, model_path)
# 5. 关闭相关的资源
writer.close()
def eval(self):
# TODO: 实际上就是训练过程中的评估指标单独在这里实现
pass
@torch.no_grad()
def predict(self, img):
# noinspection PyProtectedMember
if not F_pil._is_pil_image(img):
raise ValueError("仅支持PIL图像对象!")
# 特征处理&转换
img = self.transform(img)
img = img[None, :, :, :] # [C,H,W] --> [1,C,H,W]
# 模型预测
y_, v_ = self.net(img)
# 结果转换输出
return self.class_names[torch.argmax(y_, dim=-1)], v_.detach().cpu().numpy()[0]
def _calc_loss(self, outputs, vectors, labels, pairs, p):
input1 = vectors[pairs[:, 0]]
input2 = vectors[pairs[:, 1]]
# noinspection PyUnresolvedReferences
target = 2 * ((labels[pairs[:, 0]] == labels[pairs[:, 1]]).to(torch.int)) - 1
_ce_loss = self.loss_fn(outputs, labels)
_sim_loss = self.sim_loss_fn(input1, input2, target)
_loss = p * _ce_loss + (1-p)* _sim_loss
# _loss = _ce_loss
return _loss, _ce_loss, _sim_loss
5.测试
from image_classifi.src.image_classifi_model.model import ImageClassifiModel
import torch
from PIL import Image
from torch import nn
import os
import numpy as np
root_dir=r'D:\pythonProject\image_classifi'
m=ImageClassifiModel(
model_dir=os.path.join(root_dir,'output03','model_cpu'),
class_names=['cat','dog'],
is_online=True
)
root_dir=r'D:\pythonProject\dogcat\test'
img=Image.open('IMG_2770(20220810-144822).JPG')
r1,r2=m.predict(img)
print(r1)
print(r2)
img=Image.open('IMG_2768(20220810-143936).JPG')
r3,r4=m.predict(img)
print(r3)
print(r4)
print(np.sum(r2*r4)/(np.sqrt(np.sum(r2**2))*np.sqrt(np.sum(r4**2))))#计算余弦相似度
训练main:
from image_classifi_model import model
import os
root_dir = r'D:\pythonProject\image_classifi'
m = model.ImageClassifiModel(
num_classes=2,
batch_size=16,
num_workers=0,
model_dir=os.path.join(root_dir, 'output03', 'model_cpu'),
summary_dir=os.path.join(root_dir, 'output03', 'summary'),
lr=0.005,
momentum=0.5,
weight_decay=0.05,
nesterov=True,
use_gpu=False,
class_names=['猫','狗']
)
root_dir = r'D:\pythonProject\dogcat'
m.training(
train_data_dir=os.path.join(root_dir, 'train'),
test_data_dir=os.path.join(root_dir, 'test'),
total_epoch=10,
summary_step_interval=1,
eval_epoch_interval=2,
save_epoch_interval=1,
)
Original: https://blog.csdn.net/weixin_42804612/article/details/126267178
Author: 童星萌宝屋
Title: vgg16利用相似性损失函数进行猫狗图像分类
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/667333/
转载文章受原作者版权保护。转载请注明原作者出处!