123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- from __future__ import absolute_import
- import numpy as np
- # from sklearn.utils.linear_assignment_ import linear_assignment
- from scipy.optimize import linear_sum_assignment as linear_assignment
- from yolox.deepsort_tracker import kalman_filter
-
-
- INFTY_COST = 1e+5
-
-
- def min_cost_matching(
- distance_metric, max_distance, tracks, detections, track_indices=None,
- detection_indices=None):
- """Solve linear assignment problem.
- Parameters
- ----------
- distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
- The distance metric is given a list of tracks and detections as well as
- a list of N track indices and M detection indices. The metric should
- return the NxM dimensional cost matrix, where element (i, j) is the
- association cost between the i-th track in the given track indices and
- the j-th detection in the given detection_indices.
- max_distance : float
- Gating threshold. Associations with cost larger than this value are
- disregarded.
- tracks : List[track.Track]
- A list of predicted tracks at the current time step.
- detections : List[detection.Detection]
- A list of detections at the current time step.
- track_indices : List[int]
- List of track indices that maps rows in `cost_matrix` to tracks in
- `tracks` (see description above).
- detection_indices : List[int]
- List of detection indices that maps columns in `cost_matrix` to
- detections in `detections` (see description above).
- Returns
- -------
- (List[(int, int)], List[int], List[int])
- Returns a tuple with the following three entries:
- * A list of matched track and detection indices.
- * A list of unmatched track indices.
- * A list of unmatched detection indices.
- """
- if track_indices is None:
- track_indices = np.arange(len(tracks))
- if detection_indices is None:
- detection_indices = np.arange(len(detections))
-
- if len(detection_indices) == 0 or len(track_indices) == 0:
- return [], track_indices, detection_indices # Nothing to match.
-
- cost_matrix = distance_metric(
- tracks, detections, track_indices, detection_indices)
- cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
-
- row_indices, col_indices = linear_assignment(cost_matrix)
-
- matches, unmatched_tracks, unmatched_detections = [], [], []
- for col, detection_idx in enumerate(detection_indices):
- if col not in col_indices:
- unmatched_detections.append(detection_idx)
- for row, track_idx in enumerate(track_indices):
- if row not in row_indices:
- unmatched_tracks.append(track_idx)
- for row, col in zip(row_indices, col_indices):
- track_idx = track_indices[row]
- detection_idx = detection_indices[col]
- if cost_matrix[row, col] > max_distance:
- unmatched_tracks.append(track_idx)
- unmatched_detections.append(detection_idx)
- else:
- matches.append((track_idx, detection_idx))
- return matches, unmatched_tracks, unmatched_detections
-
-
- def matching_cascade(
- distance_metric, max_distance, cascade_depth, tracks, detections,
- track_indices=None, detection_indices=None):
- """Run matching cascade.
- Parameters
- ----------
- distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
- The distance metric is given a list of tracks and detections as well as
- a list of N track indices and M detection indices. The metric should
- return the NxM dimensional cost matrix, where element (i, j) is the
- association cost between the i-th track in the given track indices and
- the j-th detection in the given detection indices.
- max_distance : float
- Gating threshold. Associations with cost larger than this value are
- disregarded.
- cascade_depth: int
- The cascade depth, should be se to the maximum track age.
- tracks : List[track.Track]
- A list of predicted tracks at the current time step.
- detections : List[detection.Detection]
- A list of detections at the current time step.
- track_indices : Optional[List[int]]
- List of track indices that maps rows in `cost_matrix` to tracks in
- `tracks` (see description above). Defaults to all tracks.
- detection_indices : Optional[List[int]]
- List of detection indices that maps columns in `cost_matrix` to
- detections in `detections` (see description above). Defaults to all
- detections.
- Returns
- -------
- (List[(int, int)], List[int], List[int])
- Returns a tuple with the following three entries:
- * A list of matched track and detection indices.
- * A list of unmatched track indices.
- * A list of unmatched detection indices.
- """
- if track_indices is None:
- track_indices = list(range(len(tracks)))
- if detection_indices is None:
- detection_indices = list(range(len(detections)))
-
- unmatched_detections = detection_indices
- matches = []
- for level in range(cascade_depth):
- if len(unmatched_detections) == 0: # No detections left
- break
-
- track_indices_l = [
- k for k in track_indices
- if tracks[k].time_since_update == 1 + level
- ]
- if len(track_indices_l) == 0: # Nothing to match at this level
- continue
-
- matches_l, _, unmatched_detections = \
- min_cost_matching(
- distance_metric, max_distance, tracks, detections,
- track_indices_l, unmatched_detections)
- matches += matches_l
- unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
- return matches, unmatched_tracks, unmatched_detections
-
-
- def gate_cost_matrix(
- kf, cost_matrix, tracks, detections, track_indices, detection_indices,
- gated_cost=INFTY_COST, only_position=False):
- """Invalidate infeasible entries in cost matrix based on the state
- distributions obtained by Kalman filtering.
- Parameters
- ----------
- kf : The Kalman filter.
- cost_matrix : ndarray
- The NxM dimensional cost matrix, where N is the number of track indices
- and M is the number of detection indices, such that entry (i, j) is the
- association cost between `tracks[track_indices[i]]` and
- `detections[detection_indices[j]]`.
- tracks : List[track.Track]
- A list of predicted tracks at the current time step.
- detections : List[detection.Detection]
- A list of detections at the current time step.
- track_indices : List[int]
- List of track indices that maps rows in `cost_matrix` to tracks in
- `tracks` (see description above).
- detection_indices : List[int]
- List of detection indices that maps columns in `cost_matrix` to
- detections in `detections` (see description above).
- gated_cost : Optional[float]
- Entries in the cost matrix corresponding to infeasible associations are
- set this value. Defaults to a very large value.
- only_position : Optional[bool]
- If True, only the x, y position of the state distribution is considered
- during gating. Defaults to False.
- Returns
- -------
- ndarray
- Returns the modified cost matrix.
- """
- gating_dim = 2 if only_position else 4
- gating_threshold = kalman_filter.chi2inv95[gating_dim]
- measurements = np.asarray(
- [detections[i].to_xyah() for i in detection_indices])
- for row, track_idx in enumerate(track_indices):
- track = tracks[track_idx]
- gating_distance = kf.gating_distance(
- track.mean, track.covariance, measurements, only_position)
- cost_matrix[row, gating_distance > gating_threshold] = gated_cost
- return cost_matrix
|