paper:Towards Total Recall in Industrial Anomaly Detection
code:GitHub – amazon-science/patchcore-inspection
存在的问题
目前无监督缺陷检测常用的一种方法是直接利用在ImageNet上预训练的模型中的表示,而不专门进行目标分布的迁移和适配,比如PaDiM。由于这类方法是non-adaptive的,因此网络在更深更抽象层上的适配置信度有限,因为从ImageNet上学习到的高级抽象特征和工业环境中所需要的抽象特征相关度不高。此外,由于可提取的高维特征表示较少,这类方法在测试时可用的nominal context也受到了限制。
本文的创新点
针对上述问题,本文提出了一种新的无监督缺陷检测算法PatchCore,它具有以下特点
- 最大化测试阶段可用的nominal information
- 减少对ImageNet数据的bias
- 保持高推理速度、
具体包括
- 使用局部聚合的,mid-level的特征patch
a. 特征的抽象语义信息太少,深层特征对ImageNet数据的bias又太大,采用mid-level的特征可以在细节信息、抽象语义信息、对ImageNet的bias之间获得很好的平衡。
b. 在局部邻域上进行特征聚合可以保留足够的spatial context - 引入greedy coreset subsampling
显著降低存储内存,提高推理速度
方法与实现
Locally aware patch features
首先为了保留足够的细节信息同时又不会让提取的抽象信息太过于偏向ImageNet的数据,作者选用中间层的特征表示,对于ResNet模型选用第2、3层的特征图。
作者通过在局部邻域上进行特征聚合的方式来提取特征
这里 (\mathcal{N}^{(h,w)}_{p} ) 表示特征图上位置 ((h,w)) 处大小为 (p\times p) 的一块patch,文中取p=3。则位置 ((h,w)) 处的locally aware features如下所示
其中 (f_{agg}) 是邻域特征向量的聚合函数,文中采用adaptive average pooling。
提取邻域特征向量的代码如下,首先提取预训练模型中的layer2、layer3。对于原始输入经过预处理后送入模型的大小为 224 x 224,假设 batch_size=2,则输入大小为 (2, 3, 224, 224),layer2、layer3的输出大小分别为(2, 512, 28, 28)、(2, 1024, 14, 14)。然后通过patchfiy函数提取局部邻域内的特征,这里通过torch.nn.Unfold实现,这个函数的用法见torch.nn.functional.unfold 用法解读_00000cj的博客-CSDN博客。这里和PaDiM中不一样的是,PaDiM中的stride=patchsize,也就是每个patch之间是互不重合的,对于28×28的feature map,patch_size=2,stride=2,padding=0,输出14×14。而这里patch_size=3,stride=1,padding=1,输出28×28。layer2、layer3经过patchify提出的邻域特征表示维度分别为(2, 784, 512, 3, 3)、(2, 196, 1024, 3, 3),其中784=28×28, 196=14×14。
features = [features[layer] for layer in self.layers_to_extract_from]
{'layer2': torch.Size([2, 512, 28, 28])
'layer3': torch.Size([2, 1024, 14, 14])}
features = [
self.patch_maker.patchify(x, return_spatial_info=True) for x in features
]
class PatchMaker:
def __init__(self, patchsize, stride=None):
self.patchsize = patchsize # 3
self.stride = stride # 1
def patchify(self, features, return_spatial_info=False):
"""Convert a tensor into a tensor of respective patches.
Args:
x: [torch.Tensor, bs x c x w x h]
Returns:
x: [torch.Tensor, bs * w//stride * h//stride, c, patchsize,
patchsize]
"""
padding = int((self.patchsize - 1) / 2) # 1
unfolder = torch.nn.Unfold(
kernel_size=self.patchsize, stride=self.stride, padding=padding, dilation=1
)
unfolded_features = unfolder(features) # (2,512,28,28)->(2,4608,784)
number_of_total_patches = []
for s in features.shape[-2:]: # [28,28]
n_patches = (
s + 2 * padding - 1 * (self.patchsize - 1) - 1
) / self.stride + 1
number_of_total_patches.append(int(n_patches)) # [28,28]
unfolded_features = unfolded_features.reshape(
*features.shape[:2], self.patchsize, self.patchsize, -1
) # (2,512,3,3,784)
unfolded_features = unfolded_features.permute(0, 4, 1, 2, 3) # (2,784,512,3,3)
if return_spatial_info: # True
return unfolded_features, number_of_total_patches
return unfolded_features
然后对layer3的输出进行bilinear插值使之与layer2匹配,得到features如下,其中1568=2x28x28,将batch_size维度和spatial维度合并到了一起。
features = [x.reshape(-1, *x.shape[-3:]) for x in features] # [(1568,512,3,3),(1568,1024,3,3)]
然后通过自适应平均池化进行特征聚合,即上面提到的 (f_{agg}),这样对于预训练模型输出feature map上的每个位置(h, w),都得到一个预先设定维度 (d) 的单一表示,文中 (d=1024)。
代码如下
调用 features = self.forward_modules“preprocessing” # (1568,2,1024)
class MeanMapper(torch.nn.Module):
def __init__(self, preprocessing_dim):
super(MeanMapper, self).__init__()
self.preprocessing_dim = preprocessing_dim
def forward(self, features):
features = features.reshape(len(features), 1, -1) # (1568,512,3,3)->(1568,1,4608)
return F.adaptive_avg_pool1d(features, self.preprocessing_dim).squeeze(1) # (1568,1,4608)->(1568,1024)
class Preprocessing(torch.nn.Module):
def __init__(self, input_dims, output_dim):
super(Preprocessing, self).__init__()
self.input_dims = input_dims # [512,1024]
self.output_dim = output_dim # 1024
self.preprocessing_modules = torch.nn.ModuleList()
for input_dim in input_dims:
module = MeanMapper(output_dim)
self.preprocessing_modules.append(module)
def forward(self, features): # [(1568,512,3,3),(1568,1024,3,3)]
_features = []
for module, feature in zip(self.preprocessing_modules, features):
_features.append(module(feature)) # [(1568,1024),(1568,1024)]
return torch.stack(_features, dim=1) # (1568,2,1024)
这样layer2、layer3的聚合特征[(1568, 512, 3, 3), (1568, 1024, 3, 3)]经过预处理,即分别经过自适应均值池化然后stack一起得到 (1568,2,1024)的输出特征。
然后再进一步进行聚合,得到(1568, 1024)的输出。
features = self.forward_modules“preadapt_aggregator” # (1568,1024)
class Aggregator(torch.nn.Module):
def __init__(self, target_dim):
super(Aggregator, self).__init__()
self.target_dim = target_dim # 1024
def forward(self, features): # (1568,2,1024)
"""Returns reshaped and average pooled features."""
# batchsize x number_of_layers x input_dim -> batchsize x target_dim
features = features.reshape(len(features), 1, -1) # (1568,1,2048)
features = F.adaptive_avg_pool1d(features, self.target_dim) # (1568,1,1024)
return features.reshape(len(features), -1) # (1568,1024)
Coreset-reduced patch-feature memory bank
上面的代码中batch_size=2,一个batch的输出为(1568, 1024),其中1568=2×784=2x28x28,MVTec数据集中的bottle类别训练集共209张,因此整个训练集最终得到的memory bank (\mathcal{M} ) 的维度为(163856, 1024),其中163856=28x28x209,随着训练集 (\mathcal{X}{N} ) size的增大,(\mathcal{M} ) 也变得越来越大,最终的推理时间和存储空间也随之增大,因此通常需要对 (\mathcal{M} ) 进行降维,且尽可能保存 (\mathcal{M} ) 中编码的nominal feature。随机下采样会丢失 (\mathcal{M} ) 中的有用信息,本文使用coreset subsampling方法来减小 (\mathcal{M} ),coreset selection旨在找到一个子集 (\mathcal{S}\subset \mathcal{A}),对于通过 (\mathcal{A}) 得到的解,通过 (\mathcal{S}) 可以快速得到最近似解。根据不同的问题,coreset selection的目标也不同,因为PatchCore采用的是nearest neighbour computation,因此本文选用 _minmax facility location coreset selection来寻找子集 (\mathcal{M}_{C}),为了减少coreset selection的时间,本文通过random linear projection (\psi :\mathbb{R} ^{d}\to\mathbb{R} ^{d^{}},d^{}
Original: https://blog.csdn.net/ooooocj/article/details/127834029
Author: 00000cj
Title: PatchCore原理与代码解读
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/787215/
转载文章受原作者版权保护。转载请注明原作者出处!