Meta Byte Track
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

matching.py 6.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import cv2
  2. import numpy as np
  3. import scipy
  4. import lap
  5. from scipy.spatial.distance import cdist
  6. from cython_bbox import bbox_overlaps as bbox_ious
  7. from yolox.tracker import kalman_filter
  8. import time
  9. def merge_matches(m1, m2, shape):
  10. O,P,Q = shape
  11. m1 = np.asarray(m1)
  12. m2 = np.asarray(m2)
  13. M1 = scipy.sparse.coo_matrix((np.ones(len(m1)), (m1[:, 0], m1[:, 1])), shape=(O, P))
  14. M2 = scipy.sparse.coo_matrix((np.ones(len(m2)), (m2[:, 0], m2[:, 1])), shape=(P, Q))
  15. mask = M1*M2
  16. match = mask.nonzero()
  17. match = list(zip(match[0], match[1]))
  18. unmatched_O = tuple(set(range(O)) - set([i for i, j in match]))
  19. unmatched_Q = tuple(set(range(Q)) - set([j for i, j in match]))
  20. return match, unmatched_O, unmatched_Q
  21. def _indices_to_matches(cost_matrix, indices, thresh):
  22. matched_cost = cost_matrix[tuple(zip(*indices))]
  23. matched_mask = (matched_cost <= thresh)
  24. matches = indices[matched_mask]
  25. unmatched_a = tuple(set(range(cost_matrix.shape[0])) - set(matches[:, 0]))
  26. unmatched_b = tuple(set(range(cost_matrix.shape[1])) - set(matches[:, 1]))
  27. return matches, unmatched_a, unmatched_b
  28. def linear_assignment(cost_matrix, thresh):
  29. if cost_matrix.size == 0:
  30. return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))
  31. matches, unmatched_a, unmatched_b = [], [], []
  32. cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
  33. for ix, mx in enumerate(x):
  34. if mx >= 0:
  35. matches.append([ix, mx])
  36. unmatched_a = np.where(x < 0)[0]
  37. unmatched_b = np.where(y < 0)[0]
  38. matches = np.asarray(matches)
  39. return matches, unmatched_a, unmatched_b
  40. def ious(atlbrs, btlbrs):
  41. """
  42. Compute cost based on IoU
  43. :type atlbrs: list[tlbr] | np.ndarray
  44. :type atlbrs: list[tlbr] | np.ndarray
  45. :rtype ious np.ndarray
  46. """
  47. ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float)
  48. if ious.size == 0:
  49. return ious
  50. ious = bbox_ious(
  51. np.ascontiguousarray(atlbrs, dtype=np.float),
  52. np.ascontiguousarray(btlbrs, dtype=np.float)
  53. )
  54. return ious
  55. def iou_distance(atracks, btracks):
  56. """
  57. Compute cost based on IoU
  58. :type atracks: list[STrack]
  59. :type btracks: list[STrack]
  60. :rtype cost_matrix np.ndarray
  61. """
  62. if (len(atracks)>0 and isinstance(atracks[0], np.ndarray)) or (len(btracks) > 0 and isinstance(btracks[0], np.ndarray)):
  63. atlbrs = atracks
  64. btlbrs = btracks
  65. else:
  66. atlbrs = [track.tlbr for track in atracks]
  67. btlbrs = [track.tlbr for track in btracks]
  68. _ious = ious(atlbrs, btlbrs)
  69. cost_matrix = 1 - _ious
  70. return cost_matrix
  71. def v_iou_distance(atracks, btracks):
  72. """
  73. Compute cost based on IoU
  74. :type atracks: list[STrack]
  75. :type btracks: list[STrack]
  76. :rtype cost_matrix np.ndarray
  77. """
  78. if (len(atracks)>0 and isinstance(atracks[0], np.ndarray)) or (len(btracks) > 0 and isinstance(btracks[0], np.ndarray)):
  79. atlbrs = atracks
  80. btlbrs = btracks
  81. else:
  82. atlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in atracks]
  83. btlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in btracks]
  84. _ious = ious(atlbrs, btlbrs)
  85. cost_matrix = 1 - _ious
  86. return cost_matrix
  87. def embedding_distance(tracks, detections, metric='cosine'):
  88. """
  89. :param tracks: list[STrack]
  90. :param detections: list[BaseTrack]
  91. :param metric:
  92. :return: cost_matrix np.ndarray
  93. """
  94. cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float)
  95. if cost_matrix.size == 0:
  96. return cost_matrix
  97. det_features = np.asarray([track.curr_feat for track in detections], dtype=np.float)
  98. #for i, track in enumerate(tracks):
  99. #cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
  100. track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float)
  101. cost_matrix = np.maximum(0.0, cdist(track_features, det_features, metric)) # Nomalized features
  102. return cost_matrix
  103. def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False):
  104. if cost_matrix.size == 0:
  105. return cost_matrix
  106. gating_dim = 2 if only_position else 4
  107. gating_threshold = kalman_filter.chi2inv95[gating_dim]
  108. measurements = np.asarray([det.to_xyah() for det in detections])
  109. for row, track in enumerate(tracks):
  110. gating_distance = kf.gating_distance(
  111. track.mean, track.covariance, measurements, only_position)
  112. cost_matrix[row, gating_distance > gating_threshold] = np.inf
  113. return cost_matrix
  114. def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98):
  115. if cost_matrix.size == 0:
  116. return cost_matrix
  117. gating_dim = 2 if only_position else 4
  118. gating_threshold = kalman_filter.chi2inv95[gating_dim]
  119. measurements = np.asarray([det.to_xyah() for det in detections])
  120. for row, track in enumerate(tracks):
  121. gating_distance = kf.gating_distance(
  122. track.mean, track.covariance, measurements, only_position, metric='maha')
  123. cost_matrix[row, gating_distance > gating_threshold] = np.inf
  124. cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_) * gating_distance
  125. return cost_matrix
  126. def fuse_iou(cost_matrix, tracks, detections):
  127. if cost_matrix.size == 0:
  128. return cost_matrix
  129. reid_sim = 1 - cost_matrix
  130. iou_dist = iou_distance(tracks, detections)
  131. iou_sim = 1 - iou_dist
  132. fuse_sim = reid_sim * (1 + iou_sim) / 2
  133. det_scores = np.array([det.score for det in detections])
  134. det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
  135. #fuse_sim = fuse_sim * (1 + det_scores) / 2
  136. fuse_cost = 1 - fuse_sim
  137. return fuse_cost
  138. def fuse_score(cost_matrix, detections):
  139. if cost_matrix.size == 0:
  140. return cost_matrix
  141. iou_sim = 1 - cost_matrix
  142. det_scores = np.array([det.score for det in detections])
  143. det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
  144. fuse_sim = iou_sim * det_scores
  145. fuse_cost = 1 - fuse_sim
  146. return fuse_cost