DeepSort
*
– 1. MOT(Multi-Object Tracking)简介
– 2. DeepSort前身:Sort(Simple Online And Realtime Tracking)
– 3. DeepSort总体框架(流程)
–
+ 3.1 框架概要
+ 3.2 流程分析
– 4. DeepSort各模块讲解
–
+ 4.1 目标检测模块
+
* 4.1.1 目标检测模型概述
* 4.1.2 Detection类解析
+ 4.2 轨迹跟踪模块
+
* 4.2.1 Track类
* 4.2.2 Tracker类
* 4.2.3 卡尔曼滤波
+ 4.3 数据匹配模块
+
* 4.3.1 ReID模块
* 4.3.2 级联匹配和IOU匹配
* 4.3.3 匈牙利算法
1. MOT(Multi-Object Tracking)简介
多目标跟踪是 检测视频影像中出现的多个目标(如行人,汽车等)并对每个目标进行 轨迹跟踪及 ID分配。其中视频前后出现的同一目标应分配同一ID,不同目标分配不同ID。
目前,MOT问题的 处理框架主要有(1) 先检测后跟踪(Tracking by detection),如Sort/DeepSort;(2) 检测和跟踪联合,如JDE,CenterTrack等;(3) 基于注意力机制,如TransTrack,TrackFormer等
其中 处理过程主要可分为四个步骤:
1.按帧数解析输入视频
2.通过目标检测网络获取视频原始帧的目标检测框
3.对检测出的目标框进行特征提取(运动或语义特征)并对前后视频帧进行相似度计算
4.数据关联,匹配目标框和对应轨迹及ID
2. DeepSort前身:Sort(Simple Online And Realtime Tracking)
Sort 主要思路:首先通过检测器Detections(如Faster RCNN)检测出每一帧的目标;其次通过卡尔曼滤波预测目标在下一帧的位置,将预测的位置与检测器实际检测的位置做相似度计算(IOU);最后通过匈牙利算法匹配对应轨迹和目标框,匹配结果分为三种:(1)未匹配的轨迹,直接删除,(2)未匹配的目标框,初始化为新的轨迹,(3)匹配成功的轨迹与目标框,通过卡尔曼滤波更新检测框位置,获取最优估计。
Sort 存在问题:由于只考虑检测框与预测框的重叠面积(IOU),当两个目标发生遮挡后导致ID-Switch增大(同一目标的ID发生变化);同时未考虑对于遮挡后重新出现目标,进一步导致ID-Switch过大。
; 3. DeepSort总体框架(流程)
3.1 框架概要
对比Sort的 主要改进:利用ReID模型提取外观语义特征,加入 外观信息;增加了 级联匹配(Matching Cascade)和 轨迹确认(Confirmed、Tentative、Deleted)
DeepSort 主要模块:
(1) 目标检测模块:通过目标检测网络,获取输入每一帧图片中的目标框
(2) 轨迹跟踪模块:通过卡尔曼滤波进行轨迹预测和更新,获取新的轨迹集合
(3) 数据匹配模块:通过级联匹配和IOU匹配将轨迹和目标框关联
DeepSort 主要流程:检测器获取视频当前帧中目标框 → \rightarrow → 卡尔曼滤波根据当前帧的轨迹集合预测下一帧轨迹集合 → \rightarrow → 预测轨迹与下一帧检测目标框进行匹配 → \rightarrow → 卡尔曼滤波更新匹配成功的轨迹
; 3.2 流程分析
整个流程概括如下:
(1)将第一帧检测目标框初始化对应轨迹进行卡尔曼滤波预测下一时刻轨迹,其中初始化轨迹状态为不确定态
(2)将上一时刻确认态轨迹与当前时刻检测目标框进行级联匹配,级联匹配结果中匹配失败轨迹和匹配失败目标框用于后续IOU匹配,匹配成功轨迹和目标框进行卡尔曼滤波预测和更新
(3)将级联匹配结果中匹配失败轨迹和匹配失败目标框以及上一帧不确定态轨迹进行IOU匹配,匹配结果中匹配失败轨迹若仍为不确定态或为确定态但连续匹配失败次数超标则删除该轨迹;匹配失败轨迹为确定态且连续匹配失败次数未超标进行卡尔曼滤波预测;匹配失败目标框则初始化对应轨迹进行卡尔曼滤波预测;匹配成功轨迹和目标框进行卡尔曼滤波预测和更新
(4)重复(2)和(3),直到结束
4. DeepSort各模块讲解
4.1 目标检测模块
目标检测模块通过对应模型检测视频每一帧图片中的目标,其精度直接影响多目标跟踪的最终效果。
4.1.1 目标检测模型概述
目前整理了有关基于深度学习的目标检测模型的 发展路线:
首先以 Anchor-based为基础的Two-stage模型因其检测速度慢的问题逐渐发展为One-stage模型。然而由于Anchor的设计存在超参数,难以覆盖所有尺寸目标;同时为了保证一定的精度,选择了大量的Anchor用来检测占用计算量和内存消耗。
后来以 Anchor-free为基础的模型提出以检测关键点(中心点)的方式解决Anchor存在的问题,大大优化了模型超参数的数量。此外大多数目标检测模型需要对预测结果进行非极大值抑制(NMS)后处理,对此提出了 NMS-free模型皆在取消NMS后处理,提高检测速度,实现真正意义上的端到端检测。
目前 Transformer架构在视觉领域(ViT,Swin Transformer)取得了一系列成功。自此以Transformer为基础的目标检测模型展现出了优越的性能并且基于Transformer的模型能够天然的消除人工设置的Anchor-based以及NMS后处理是一个优良的端到端检测器。
有关目标检测模型解析,后续整理发布
; 4.1.2 Detection类解析
通过相应目标检测模型将视频中每一帧图片的检测结果(边框坐标及置信度得分)封装于 Detection类,此外该类包含了ReID模块提取的外观语义特征以及不同边框坐标形式转换方法, 具体代码如下:
class Detection(object):
def __init__(self, tlwh, confidence, feature):
'''
tlwh:目标框左上角横纵坐标x, y; 宽w; 高h
confindnce:目标类别置信度得分
feature:ReID模块提取目标框的外观语义特征
'''
self.tlwh = np.asarray(tlwh, dtype=np.float)
self.confidence = float(confidence)
self.feature = np.asarray(feature, dtype=np.float32)
def to_tlbr(self):
'''将目标框坐标tlwh转换为左上角横纵坐标x,y; 右下角横纵坐标x, y'''
ret = self.tlwh.copy()
ret[2:] += ret[:2]
return ret
def to_xyah(self):
'''将目标框坐标tlwh转换为中心点横纵坐标x,y; 宽高比a; 高h'''
ret = self.tlwh.copy()
ret[:2] += ret[2:] / 2
ret[2] /= ret[3]
return ret
4.2 轨迹跟踪模块
轨迹跟踪模块主要通过Track类和Tracker类完成,具体功能如下:
Track类:存储单个轨迹的状态和信息以及负责单个轨迹的预测和更新
Tracker类:存储所有轨迹(集合)的状态和信息,负责轨迹的初始化以及所有轨迹(集合)的预测和更新同时封装了数据匹配模块
4.2.1 Track类
Track类的 核心代码及注释如下:
class TrackState:
'''
单个轨迹的三种状态
'''
Tentative = 1
Confirmed = 2
Deleted = 3
class Track:
def __init__(self, mean, covariance, track_id, class_id, conf, n_init, max_age,
feature=None):
'''
mean:位置、速度状态分布均值向量,维度(8×1)
convariance:位置、速度状态分布方差矩阵,维度(8×8)
track_id:轨迹ID
class_id:轨迹所属类别
hits:轨迹更新次数(初始化为1),即轨迹与目标连续匹配成功次数
age:轨迹连续存在的帧数(初始化为1),即轨迹出现到被删除的连续总帧数
time_since_update:轨迹距离上次更新后的连续帧数(初始化为0),即轨迹与目标连续匹配失败次数
state:轨迹状态
features:轨迹所属目标的外观语义特征,轨迹匹配成功时添加当前帧的新外观语义特征
conf:轨迹所属目标的置信度得分
_n_init:轨迹状态由不确定态到确定态所需连续匹配成功的次数
_max_age:轨迹状态由不确定态到删除态所需连续匹配失败的次数
'''
self.mean = mean
self.covariance = covariance
self.track_id = track_id
self.class_id = int(class_id)
self.hits = 1
self.age = 1
self.time_since_update = 0
self.state = TrackState.Tentative
self.features = []
if feature is not None:
self.features.append(feature)
self.conf = conf
self._n_init = n_init
self._max_age = max_age
def increment_age(self):
'''
预测下一帧轨迹时调用
'''
self.age += 1
self.time_since_update += 1
def predict(self, kf):
'''
预测下一帧轨迹信息
'''
self.mean, self.covariance = kf.predict(self.mean, self.covariance)
self.increment_age()
def update(self, kf, detection, class_id, conf):
'''
更新匹配成功的轨迹信息
'''
self.conf = conf
self.mean, self.covariance = kf.update(
self.mean, self.covariance, detection.to_xyah())
self.features.append(detection.feature)
self.class_id = class_id.int()
self.hits += 1
self.time_since_update = 0
if self.state == TrackState.Tentative and self.hits >= self._n_init:
self.state = TrackState.Confirmed
def mark_missed(self):
'''
将轨迹状态转为删除态
'''
if self.state == TrackState.Tentative:
self.state = TrackState.Deleted
elif self.time_since_update > self._max_age:
self.state = TrackState.Deleted
'''
该部分还存在一些轨迹坐标转化及状态判定函数,具体可参考代码来源
'''
Track类是轨迹的一个基本单位,通过mean、covariance存储轨迹的 位置和速度信息,利用 卡尔曼滤波对轨迹信息进行 预测和 更新;同时定义了轨迹的三种状态(Tentaitve、Confirmed、Deleted)及 三种状态之间的转换关系。其中三种状态如下:
Tentaitive态:代表轨迹初始(默认)状态,该状态轨迹更新时连续匹配成功次数(hits)+1,达到指定匹配成功次数(_n_init默认为3)转换为Confirmed态;经过级联匹配和IOU匹配后该轨迹状态仍为Tentaiva,则认为轨迹没有匹配上任何目标,转换为Deleted态。
Confirmed态:代表轨迹匹配成功状态。该状态轨迹预测而不更新时连续匹配失败次数(time_since_update)+1,达到连续匹配失败次数(_max_age默认为70)转换为Deleted态。
Deleted态:代表轨迹失效状态。该轨迹从所有轨迹(集合)中删除。
4.2.2 Tracker类
Tracker类的 核心代码及注释如下:
其中只列出了用于轨迹预测、更新及初始化部分,对于其中的数据匹配模块在4.3.2中讲解
class Tracker:
GATING_THRESHOLD = np.sqrt(kalman_filter.chi2inv95[4])
def __init__(self, metric, max_iou_distance=0.9, max_age=30, n_init=3, _lambda=0):
'''
metric:用于关联轨迹与目标框的距离度量函数,是NearestNeighborDistanceMetric类
max_iou_distance:IOU匹配中代价矩阵的阈值
max_age:轨迹状态由不确定态到删除所需连续匹配失败的次数
n_init:轨迹状态由不确定态到确定态所需连续匹配成功的次数
_lambda:门控距离矩阵和外观语义特征相似度矩阵之间的权重
kf:卡尔曼滤波模块
tracks:所有轨迹状态和信息集合
_next_id:下一个轨迹ID
'''
self.metric = metric
self.max_iou_distance = max_iou_distance
self.max_age = max_age
self.n_init = n_init
self._lambda = _lambda
self.kf = kalman_filter.KalmanFilter()
self.tracks = []
self._next_id = 1
def predict(self):
'''
预测轨迹集合中所有轨迹的下一帧信息
'''
for track in self.tracks:
track.predict(self.kf)
def increment_ages(self):
'''
当视频帧中未检测到目标时调用
因无法进行卡尔曼滤波更新,故不再额外进行卡尔曼滤波预测,但需要更新相应轨迹信息
'''
for track in self.tracks:
track.increment_age()
track.mark_missed()
def update(self, detections, classes, confidences):
'''
更新轨迹集合中所有轨迹状态和信息
'''
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
for track_idx, detection_idx in matches:
self.tracks[track_idx].update(self.kf, detections[detection_idx], classes[detection_idx],
confidences[detection_idx])
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx], classes[detection_idx].item(),
confidences[detection_idx].item())
self.tracks = [t for t in self.tracks if not t.is_deleted()]
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
continue
features += track.features
targets += [track.track_id for _ in track.features]
track.features = []
self.metric.partial_fit(np.asarray(features), np.asarray(targets),
active_targets)
def _initiate_track(self, detection, class_id, conf):
'''
初始化轨迹状态和信息
'''
mean, covariance = self.kf.initiate(detection.to_xyah())
self.tracks.append(Track(
mean, covariance, self._next_id, class_id, conf, self.n_init, self.max_age,
detection.feature))
self._next_id += 1
'''
该部分还存在_full_cost_metric以及_match函数代码用于轨迹和目标框的匹配,在4.3.2中讲解
'''
Tracker类是整个DeepSort框架的 核心部分, 整合了 所有轨迹的信息和状态并 逐一通过 卡尔曼滤波对轨迹信息进行 预测。由于在轨迹集合更新过程中涉及数据匹配模块,Tracker类的update函数封装了 级联匹配和IOU匹配,根据对应 匹配结果结合卡尔曼滤波更新轨迹集合中所有轨迹的状态和信息并且针对未匹配的目标(以及第一帧目标框) 初始化轨迹状态和信息。该部分结合4.3.2级联匹配和IOU匹配以及3.1DeepSort框架图阅读。
4.2.3 卡尔曼滤波
卡尔曼滤波主要包含预测和更新。其中 预测是根据系统 当前状态对系统的 下一步状态做出 有根据的预测; 更新是通过 测量值与 预测值综合 计算出系统状态的 最优估计值。具体可参考详解卡尔曼滤波原理博客,其中引用表达式:
预测:
x ⃗ k = F k x ⃗ k − 1 + B k u ⃗ k \vec x_k = F_k \vec x_{k – 1} + B_k\vec u_k x k =F k x k −1 +B k u k
P k = F k P k − 1 F k T + Q k P_k = F_kP_{k – 1}F_k^T + Q_k P k =F k P k −1 F k T +Q k
x ⃗ \vec x x: 状态向量(均值)
u ⃗ \vec u u: 外部控制向量
P P P:状态矩阵(协方差)
F F F:状态转移矩阵
B B B: 外部控制矩阵
Q Q Q: 外部干扰矩阵(噪声)
更新:
x ⃗ k ′ = x ⃗ k + K ( z ⃗ k − H k x ⃗ k ) \vec x’_k = \vec x_k + K(\vec z_k – H_k\vec x_k)x k ′=x k +K (z k −H k x k )
P k ′ = P k − K H k P k P’_k = P_k – KH_kP_k P k ′=P k −K H k P k
K = P k H k T ( H k P k H k T + R k ) − 1 K = P_kH_k^T(H_kP_kH_k^T + R_k)^{-1}K =P k H k T (H k P k H k T +R k )−1
x ⃗ ′ \vec x’x ′:最优估计向量(均值)
z ⃗ \vec z z: 测量向量(均值)
H H H:状态空间向测量空间转移矩阵
P ′ P’P ′:最优估计矩阵(协方差)
R R R: 测量矩阵(协方差)
K K K:增益矩阵
其中k − 1 k-1 k −1代表当前状态(k − 1 k-1 k −1时刻);k k k代表下一状态(k k k时刻)
卡尔曼滤波类 实现代码及注释如下:
chi2inv95 = {
1: 3.8415,
2: 5.9915,
3: 7.8147,
4: 9.4877,
5: 11.070,
6: 12.592,
7: 14.067,
8: 15.507,
9: 16.919}
class KalmanFilter(object):
def __init__(self):
'''
_motion_mat:状态转移矩阵F,维度(8×8)
_update_mat:状态空间向测量空间转移矩阵H,(维度4×8)
_std_weight_position:控制位置方差权重
_std_weight_velocity:控制速度方差权重
'''
ndim, dt = 4, 1.
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt
self._update_mat = np.eye(ndim, 2 * ndim)
self._std_weight_position = 1. / 20
self._std_weight_velocity = 1. / 160
def initiate(self, measurement):
'''
根据目标框检测值初始化轨迹
measurement:目标框测量向量(x, y, a, h),中心点横纵坐标x, y,宽高比a,高h
'''
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[0],
2 * self._std_weight_position * measurement[1],
1 * measurement[2],
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[0],
10 * self._std_weight_velocity * measurement[1],
0.1 * measurement[2],
10 * self._std_weight_velocity * measurement[3]]
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean, covariance):
'''
卡尔曼滤波根据当前时刻状态预测下一时刻状态
mean:当前时刻位置、速度状态分布向量(均值),维度(8×1)
covariance:当前时刻位置、速度状态分布矩阵(方差),维度(8×8)
'''
std_pos = [
self._std_weight_position * mean[0],
self._std_weight_position * mean[1],
1 * mean[2],
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[0],
self._std_weight_velocity * mean[1],
0.1 * mean[2],
self._std_weight_velocity * mean[3]]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(self._motion_mat, mean)
covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T))
+ motion_cov
return mean, covariance
def project(self, mean, covariance):
'''
卡尔曼滤波将状态分布映射到测量分布上
mean:下一时刻位置、速度状态分布向量(均值),维度(8×1)
covariance:下一时刻位置、速度状态分布矩阵(方差),维度(8×8)
'''
std = [
self._std_weight_position * mean[0],
self._std_weight_position * mean[1],
0.1 * mean[2],
self._std_weight_position * mean[3]]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot((self._update_mat, covariance,
self._update_mat.T))
return mean, covariance + innovation_cov
def update(self, mean, covariance, measurement):
'''
卡尔曼滤波根据测量值和预测值更新最优估计值
mean:下一时刻位置、速度状态分布向量(均值),维度(8×1)
covariance:下一时刻位置、速度状态分布矩阵(方差),维度(8×8)
measurement:下一时刻目标框测量向量(x, y, a, h),中心点横纵坐标x, y,宽高比a,高h
'''
projected_mean, projected_cov = self.project(mean, covariance)
chol_factor, lower = scipy.linalg.cho_factor(
projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
check_finite=False).T
innovation = measurement - projected_mean
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((
kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
def gating_distance(self, mean, covariance, measurements,
only_position=False):
'''
计算状态分布和测量值之间的门控距离
mean:位置、速度状态分布向量(均值),维度(8×1)
covariance:位置、速度状态分布矩阵(方差),维度(8×8)
measurement:目标框测量向量(x, y, a, h),中心点横纵坐标x, y,宽高比a,高h
only_position:是否只考虑位置分量
'''
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
cholesky_factor = np.linalg.cholesky(covariance)
d = measurements - mean
g = scipy.linalg.solve_triangular(
cholesky_factor, d.T, lower=True, check_finite=False,
overwrite_b=True)
squared_maha = np.sum(g * g, agxis=0)
return squared_maha
KalmanFilter类对轨迹的状态分布信息进行 预测和 更新,以及对目标框 初始化轨迹信息;同时包含了计算测量分布和状态分布之间的 门控距离。需要注意:
(1)代码中考虑的是线性系统,故不包含外部控制向量,即u ⃗ \vec u u=0
(2)代码中的状态分布向量(均值)x ⃗ = ( x , y , a , h , v x , v y , v a , v h ) T \vec x=(x, y, a, h, v_x, v_y, v_a, v_h)^T x =(x ,y ,a ,h ,v x ,v y ,v a ,v h )T,包含位置状态( x , y , a , h ) (x, y, a, h)(x ,y ,a ,h )即检测框中心点横纵坐标x , y x, y x ,y,宽高比a a a,高h h h及其对应速度状态( v x , v y , v a , v h ) (v_x, v_y, v_a, v_h)(v x ,v y ,v a ,v h ),其余变量的设置详见代码中注释
(3)代码中出现的有关numpy和scipy的函数可查阅官方文档numpy;scipy
(4)代码中注明了有关计算过程的矩阵关系和维度变化,结合表达式阅读,其中涉及有关线性代数和数值分析知识感兴趣可查阅有关文档
4.3 数据匹配模块
数据匹配模块主要通过 级联匹配和 IOU匹配以及 匈牙利算法来完成对轨迹和目标框的关联匹配,具体功能如下:
级联匹配:使用(ReID模块提取) 外观特征结合运动特征计算相似度,通过 匈牙利算法进行 关联匹配
IOU匹配:使用 轨迹和目标框面积 计算IOU,通过 匈牙利算法进行 关联匹配
4.3.1 ReID模块
重识别简称为ReID,是利用对应模型 判断不同时间段出现的目标是否属于同一对象。多目标跟踪问题为 减少ID-Switch引入ReID模块 提取对应目标框的外观语义特征,供后续级联匹配中 计算相似度使用。其中ReID模块是 独立于目标检测和轨迹跟踪模块,源代码中为满足多目标跟踪的实时性采用 轻量化网络模型(OsNet等)。
源代码中通过torchreid下的utils包中 FeatureExtractor类构建相应模型提取目标框的外观语义特征,其代码如下:
class FeatureExtractor(object):
'''
构建ReID模型提取外观语义特征
'''
def __init__(
self,
model_name='',
model_path='',
image_size=(256, 128),
pixel_mean=[0.485, 0.456, 0.406],
pixel_std=[0.229, 0.224, 0.225],
pixel_norm=True,
device='cuda',
verbose=True
):
'''
mode_name:模型名称
model:ReID模型
preprocess:输入数据前处理
mode_path:模型权重路径
image_size:输入图片的尺寸
pixel_mean:图片正则化均值
pixel_std:图片正则化标准差
pixel_norm:图片是否正则化
device:模型推理设备
verbose:是否展示模型信息
'''
model = build_model(
model_name,
num_classes=1,
pretrained=not (model_path and check_isfile(model_path)),
use_gpu=device.startswith('cuda')
)
model.eval()
if verbose:
num_params, flops = compute_model_complexity(
model, (1, 3, image_size[0], image_size[1])
)
print('Model: {}'.format(model_name))
print('- params: {:,}'.format(num_params))
print('- flops: {:,}'.format(flops))
if model_path and check_isfile(model_path):
load_pretrained_weights(model, model_path)
transforms = []
transforms += [T.Resize(image_size)]
transforms += [T.ToTensor()]
if pixel_norm:
transforms += [T.Normalize(mean=pixel_mean, std=pixel_std)]
preprocess = T.Compose(transforms)
to_pil = T.ToPILImage()
device = torch.device(device)
model.to(device)
self.model = model
self.preprocess = preprocess
self.to_pil = to_pil
self.device = device
def __call__(self, input):
'''
内置方法,赋予FeatureExtractor实例对象函数调用特性,即允许实例对象像函数一样被调用
'''
if isinstance(input, list):
images = []
for element in input:
if isinstance(element, str):
image = Image.open(element).convert('RGB')
elif isinstance(element, np.ndarray):
image = self.to_pil(element)
else:
raise TypeError(
'Type of each element must belong to [str | numpy.ndarray]'
)
image = self.preprocess(image)
images.append(image)
images = torch.stack(images, dim=0)
images = images.to(self.device)
elif isinstance(input, str):
image = Image.open(input).convert('RGB')
image = self.preprocess(image)
images = image.unsqueeze(0).to(self.device)
elif isinstance(input, np.ndarray):
image = self.to_pil(input)
image = self.preprocess(image)
images = image.unsqueeze(0).to(self.device)
elif isinstance(input, torch.Tensor):
if input.dim() == 3:
input = input.unsqueeze(0)
images = input.to(self.device)
else:
raise NotImplementedError
with torch.no_grad():
features = self.model(images)
return features
其中ReID模块为保证多目标跟踪的实时性,多采用轻量化网络模型。轻量化网络模型的核心是在 保持精度的前提下,从 模型大小和 推理速度两方面对网络进行 轻量化改造。目前整理了有关轻量化网络模型有:SqueezeNet、Xception、MobileNet、ShuffleNet、OsNet以及GHostNet。
有关轻量化模型解析,点击查看:
《轻量化网络总结[1]–SqueezeNet,Xception,MobileNetv1~v3》
《轻量化网络总结[2]–ShuffleNetv1/v2,OSNet,GhostNet》
; 4.3.2 级联匹配和IOU匹配
级联匹配和IOU匹配用于 轨迹和目标框的关联, 封装于Tracker类中的_match函数中,具体 代码如下:
def _full_cost_metric(self, tracks, dets, track_indices, detection_indices):
'''
使用外观特征和运动特征(门控距离)并结合相关阈值计算相似度矩阵
tracks:轨迹集合
dets:目标框集合
track_indices:轨迹索引集合
detction_indeces:目标框索引集合
'''
pos_cost = np.empty([len(track_indices), len(detection_indices)])
msrs = np.asarray([dets[i].to_xyah() for i in detection_indices])
for row, track_idx in enumerate(track_indices):
pos_cost[row, :] = np.sqrt(
self.kf.gating_distance(tracks[track_idx].mean, tracks[track_idx].covariance, msrs, False))
/ self.GATING_THRESHOLD
pos_gate = pos_cost > 1.0
app_cost = self.metric.distance(
np.array([dets[i].feature for i in detection_indices]),
np.array([tracks[i].track_id for i in track_indices]),
)
app_gate = app_cost > self.metric.matching_threshold
cost_matrix = self._lambda * pos_cost + (1 - self._lambda) * app_cost
cost_matrix[np.logical_or(pos_gate, app_gate)] = linear_assignment.INFTY_COST
return cost_matrix
def _match(self, detections):
'''
进行级联匹配和IOU匹配,获取匹配成功轨迹和目标框、匹配失败轨迹以及匹配失败目标框
'''
confirmed_tracks = [i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [i for i, t in enumerate(self.tracks) if not t.is_confirmed()]
matches_a, unmatched_tracks_a, unmatched_detections = linear_assignment.matching_cascade(
self._full_cost_metric,
linear_assignment.INFTY_COST - 1,
self.max_age,
self.tracks,
detections,
confirmed_tracks,
)
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a if self.tracks[k].time_since_update == 1
]
unmatched_tracks_a = [
k for k in unmatched_tracks_a if self.tracks[k].time_since_update != 1
]
matches_b, unmatched_tracks_b, unmatched_detections = linear_assignment.min_cost_matching(
iou_matching.iou_cost,
self.max_iou_distance,
self.tracks,
detections,
iou_track_candidates,
unmatched_detections,
)
matches = matches_a + matches_b
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
级联匹配和IOU匹配封装于Tracker类中用于轨迹和目标框的关联,结果为 匹配成功轨迹和目标框、 匹配失败轨迹以及 匹配失败目标框,依据对应关联结果进行相应的后处理,结合4.2.2Tracker类核心代码一起阅读。
其中级联匹配和IOU匹配的 具体实现代码如下:
INFTY_COST = 1e+5
def min_cost_matching(distance_metric, max_distance, tracks, detections,
track_indices=None, detection_indices=None):
'''
进行轨迹与目标框的匹配关联
distance_metric:计算用于(级联/IOU)匹配代价矩阵的函数
max_distance:阈值,大于该阈值被认为是无效的
tracks:当前时刻(卡尔曼滤波预测后)的轨迹集合
detections:当前时刻的目标框集合
track_indices:用于匹配的轨迹索引集合
detection_indices:用于匹配的目标框索引集合
'''
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
if len(detection_indices) == 0 or len(track_indices) == 0:
return [], track_indices, detection_indices
cost_matrix = distance_metric(
tracks, detections, track_indices, detection_indices)
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
row_indices, col_indices = linear_sum_assignment(cost_matrix)
matches, unmatched_tracks, unmatched_detections = [], [], []
for col, detection_idx in enumerate(detection_indices):
if col not in col_indices:
unmatched_detections.append(detection_idx)
for row, track_idx in enumerate(track_indices):
if row not in row_indices:
unmatched_tracks.append(track_idx)
for row, col in zip(row_indices, col_indices):
track_idx = track_indices[row]
detection_idx = detection_indices[col]
if cost_matrix[row, col] > max_distance:
unmatched_tracks.append(track_idx)
unmatched_detections.append(detection_idx)
else:
matches.append((track_idx, detection_idx))
return matches, unmatched_tracks, unmatched_detections
def matching_cascade(distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
'''
进行级联匹配
distance_metric:计算用于级联匹配相似度矩阵的函数
max_distance:阈值,大于该阈值被认为是无效的
cascade_depth:级联匹配的深度,与轨迹状态由不确定态到删除态所需连续匹配失败的次数(_max_age)相同
tracks:当前时刻(卡尔曼滤波预测后)的轨迹集合
detections:当前时刻的目标框集合
track_indices:用于匹配的轨迹索引集合
detection_indices:用于匹配的目标框索引集合
'''
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
for level in range(cascade_depth):
if len(unmatched_detections) == 0:
break
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
if len(track_indices_l) == 0:
continue
matches_l, _, unmatched_detections = \
min_cost_matching(
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
def gate_cost_matrix(
kf, cost_matrix, tracks, detections, track_indices, detection_indices,
gated_cost=INFTY_COST, only_position=False):
'''
通过门控距离对外观语义特征相似度矩阵进行限制,代码中将该部分转换并整合至_full_cost_metric函数中,故没用到该函数
kf:卡尔曼滤波
cost_matrix:外观语义特征相似度矩阵
tracks:当前时刻(卡尔曼滤波预测后)的轨迹集合
detections:当前时刻的目标框集合
track_indices:用于匹配的轨迹索引集合
detection_indices:用于匹配的目标框索引集合
gated_cost:无效值,大于阈值则设定为无效值
only_position:是否只考虑位置分量
'''
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray(
[detections[i].to_xyah() for i in detection_indices])
for row, track_idx in enumerate(track_indices):
track = tracks[track_idx]
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position)
cost_matrix[row, gating_distance > gating_threshold] = gated_cost
return cost_matrix
级联匹配中关联轨迹和目标框的 距离度量函数(metric)封装在NearestNeighborDistanceMetric类, 具体代码如下:
def _pdist(a, b):
'''
计算对应矩阵的成对平方距离(欧式距离)
a:维度N×M,N代表样本数量,M代表特征维数
b:维度L×M,L代表样本数量,M代表特征维数
'''
a, b = np.asarray(a), np.asarray(b)
if len(a) == 0 or len(b) == 0:
return np.zeros((len(a), len(b)))
a2, b2 = np.square(a).sum(axis=1), np.square(b).sum(axis=1)
r2 = -2. * np.dot(a, b.T) + a2[:, None] + b2[None, :]
r2 = np.clip(r2, 0., float(np.inf))
return r2
def _cosine_distance(a, b, data_is_normalized=False):
'''
计算对应矩阵的成对余弦距离
a:维度N×M,N代表样本数量,M代表特征维数
b:维度L×M,L代表样本数量,M代表特征维数
data_is_normalized:a、b矩阵的特征是否归一化
'''
if not data_is_normalized:
a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)
b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)
return 1. - np.dot(a, b.T)
def _nn_euclidean_distance(x, y):
'''
用于NearestNeighborDistanceMetric类中的欧式距离函数
x:维度N×M,N代表样本数量,M代表特征维数
y:维度L×M,L代表样本数量,M代表特征维数
'''
x_ = torch.from_numpy(np.asarray(x) / np.linalg.norm(x, axis=1, keepdims=True))
y_ = torch.from_numpy(np.asarray(y) / np.linalg.norm(y, axis=1, keepdims=True))
distances = compute_distance_matrix(x_, y_, metric='euclidean')
return np.maximum(0.0, torch.min(distances, axis=0)[0].numpy())
def _nn_cosine_distance(x, y):
'''
用于NearestNeighborDistanceMetric类中的余弦距离函数
x:维度N×M,N代表样本数量,M代表特征维数
y:维度L×M,L代表样本数量,M代表特征维数
'''
x_ = torch.from_numpy(np.asarray(x))
y_ = torch.from_numpy(np.asarray(y))
distances = compute_distance_matrix(x_, y_, metric='cosine')
distances = distances.cpu().detach().numpy()
return distances.min(axis=0)
class NearestNeighborDistanceMetric(object):
'''
度量轨迹和目标框最近距离,即根据对应确认态轨迹(同一ID)以往所有时刻的外观语义特征与当前目标框的外观语义特征,
计算轨迹和目标框的关联距离(相似度)
'''
def __init__(self, metric, matching_threshold, budget=None):
'''
metric:选择度量距离的方式,欧式距离或余弦距离
matching_threshold:阈值,大于该阈值被认为是无效值
budget:存储确认态轨迹(同一ID)以往所有时刻的外观语义特征的容量,如果为None,则认为不限制容量
sample:存储确认态轨迹(同一ID)以往所有时刻的外观语义特征字典
'''
if metric == "euclidean":
self._metric = _nn_euclidean_distance
elif metric == "cosine":
self._metric = _nn_cosine_distance
else:
raise ValueError(
"Invalid metric; must be either 'euclidean' or 'cosine'")
self.matching_threshold = matching_threshold
self.budget = budget
self.samples = {}
def partial_fit(self, features, targets, active_targets):
'''
更新确认态轨迹(同一ID)以往所有时刻的外观语义特征字典
feautures:确认态轨迹的外观语义特征矩阵
targets:外观语义特征对应的确认态轨迹ID集合
active_targets:确定态轨迹ID集合
'''
for feature, target in zip(features, targets):
self.samples.setdefault(target, []).append(feature)
if self.budget is not None:
self.samples[target] = self.samples[target][-self.budget:]
self.samples = {k: self.samples[k] for k in active_targets}
er
def distance(self, features, targets):
'''
计算目标框的外观语义特征与对应确认态轨迹(同一ID)以往所有的外观语义特征(欧式或余弦)距离矩阵
feautures:目标框的外观语义特征矩阵
targets:确认态轨迹ID集合
'''
cost_matrix = np.zeros((len(targets), len(features)))
for i, target in enumerate(targets):
cost_matrix[i, :] = self._metric(self.samples[target], features)
return cost_matrix
级联匹配使用 门控距离矩阵(运动特征)和 外观语义特征距离矩阵(外观特征)加权计算代价矩阵,其中门控距离和外观语义特征距离都通过对应的 阈值限制过大的值。匹配过程根据 最大级联匹配深度(跟_max_age相同) 逐层进行目标框与轨迹的 关联,即根据连续匹配失败次数(time_since_update)与匹配深度对应,实现匹配失败次数少的轨迹优先匹配,失败次数多的轨迹靠后匹配。通过级联匹配,可以 重新将被遮挡后重现的目标找回, 降低ID-Switch。
IOU匹配使用IOU矩阵当作代价矩阵,其中 的IOU计算就是通过轨迹和目标框的 重叠面积除以总面积来完成,具体实现在 iou_matching模块中,可参考源码。匹配过程根据对应轨迹和目标框 直接进行关联。
4.3.3 匈牙利算法
匈牙利算法基于 代价矩阵找到 最小代价的分配方法,是解决 分配问题中最优匹配(最小代价)的算法。
其中 依据定理:
代价矩阵的行或列同时加或减一个数,得到新的代价矩阵的最优匹配与原代价矩阵相同。
算法步骤:
(1)若代价矩阵不为方阵,则在相应位置补0转换为方阵
(2)代价矩阵每一行减去此行最小值后每一列减去此列最小值
(3)用最少的横线或竖线覆盖代价矩阵中的所有0元素
(4)找出(3)中未被覆盖元素的最小值,且未被覆盖元素减去该最小值;对覆盖直线交叉点元素加上该最小值
(5)重复(3)和(4),直到覆盖线的数量等于对应方阵的维度数
过程演示(假设代价矩阵维度4×3):
匈牙利算法将 确认态轨迹tracks与 当前时刻的检测目标框detections 进行关联,其中存在对应代价矩阵 不为方阵的情况需进行 填充操作(如det’列),对应的最小代价匹配结果 需忽略填充项(如图中虚线部分)。针对算法步骤(3)(4)的具体代码实现有多种版本,源代码通过scipy库中的 linear_sum_assignment函数实现匈牙利算法。
Original: https://blog.csdn.net/qq_56749449/article/details/125090313
Author: Gthan学算法
Title: 多目标跟踪(MOT)–DeepSort原理及代码详解
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/784992/
转载文章受原作者版权保护。转载请注明原作者出处!