123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- # Copyright (c) OpenMMLab. All rights reserved.
- import os
- import os.path as osp
- import shutil
- import tempfile
- from collections import defaultdict
- from typing import List, Optional, Union
- import numpy as np
- import torch
- try:
- import trackeval
- except ImportError:
- trackeval = None
- from mmengine.dist import (all_gather_object, barrier, broadcast,
- broadcast_object_list, get_dist_info,
- is_main_process)
- from mmengine.logging import MMLogger
- from mmdet.registry import METRICS, TASK_UTILS
- from .base_video_metric import BaseVideoMetric
- def get_tmpdir() -> str:
- """return the same tmpdir for all processes."""
- rank, world_size = get_dist_info()
- MAX_LEN = 512
- # 32 is whitespace
- dir_tensor = torch.full((MAX_LEN, ), 32, dtype=torch.uint8)
- if rank == 0:
- tmpdir = tempfile.mkdtemp()
- tmpdir = torch.tensor(bytearray(tmpdir.encode()), dtype=torch.uint8)
- dir_tensor[:len(tmpdir)] = tmpdir
- broadcast(dir_tensor, 0)
- tmpdir = dir_tensor.cpu().numpy().tobytes().decode().rstrip()
- return tmpdir
- @METRICS.register_module()
- class MOTChallengeMetric(BaseVideoMetric):
- """Evaluation metrics for MOT Challenge.
- Args:
- metric (str | list[str]): Metrics to be evaluated. Options are
- 'HOTA', 'CLEAR', 'Identity'.
- Defaults to ['HOTA', 'CLEAR', 'Identity'].
- outfile_prefix (str, optional): Path to save the formatted results.
- Defaults to None.
- track_iou_thr (float): IoU threshold for tracking evaluation.
- Defaults to 0.5.
- benchmark (str): Benchmark to be evaluated. Defaults to 'MOT17'.
- format_only (bool): If True, only formatting the results to the
- official format and not performing evaluation. Defaults to False.
- postprocess_tracklet_cfg (List[dict], optional): configs for tracklets
- postprocessing methods. `InterpolateTracklets` is supported.
- Defaults to []
- - InterpolateTracklets:
- - min_num_frames (int, optional): The minimum length of a
- track that will be interpolated. Defaults to 5.
- - max_num_frames (int, optional): The maximum disconnected
- length in a track. Defaults to 20.
- - use_gsi (bool, optional): Whether to use the GSI (Gaussian-
- smoothed interpolation) method. Defaults to False.
- - smooth_tau (int, optional): smoothing parameter in GSI.
- Defaults to 10.
- collect_device (str): Device name used for collecting results from
- different ranks during distributed training. Must be 'cpu' or
- 'gpu'. Defaults to 'cpu'.
- prefix (str, optional): The prefix that will be added in the metric
- names to disambiguate homonymous metrics of different evaluators.
- If prefix is not provided in the argument, self.default_prefix
- will be used instead. Default: None
- Returns:
- """
- TRACKER = 'default-tracker'
- allowed_metrics = ['HOTA', 'CLEAR', 'Identity']
- allowed_benchmarks = ['MOT15', 'MOT16', 'MOT17', 'MOT20', 'DanceTrack']
- default_prefix: Optional[str] = 'motchallenge-metric'
- def __init__(self,
- metric: Union[str, List[str]] = ['HOTA', 'CLEAR', 'Identity'],
- outfile_prefix: Optional[str] = None,
- track_iou_thr: float = 0.5,
- benchmark: str = 'MOT17',
- format_only: bool = False,
- use_postprocess: bool = False,
- postprocess_tracklet_cfg: Optional[List[dict]] = [],
- collect_device: str = 'cpu',
- prefix: Optional[str] = None) -> None:
- super().__init__(collect_device=collect_device, prefix=prefix)
- if trackeval is None:
- raise RuntimeError(
- 'trackeval is not installed,'
- 'please install it by: pip install'
- 'git+https://github.com/JonathonLuiten/TrackEval.git'
- 'trackeval need low version numpy, please install it'
- 'by: pip install -U numpy==1.23.5')
- if isinstance(metric, list):
- metrics = metric
- elif isinstance(metric, str):
- metrics = [metric]
- else:
- raise TypeError('metric must be a list or a str.')
- for metric in metrics:
- if metric not in self.allowed_metrics:
- raise KeyError(f'metric {metric} is not supported.')
- self.metrics = metrics
- self.format_only = format_only
- if self.format_only:
- assert outfile_prefix is not None, 'outfile_prefix must be not'
- 'None when format_only is True, otherwise the result files will'
- 'be saved to a temp directory which will be cleaned up at the end.'
- self.use_postprocess = use_postprocess
- self.postprocess_tracklet_cfg = postprocess_tracklet_cfg.copy()
- self.postprocess_tracklet_methods = [
- TASK_UTILS.build(cfg) for cfg in self.postprocess_tracklet_cfg
- ]
- assert benchmark in self.allowed_benchmarks
- self.benchmark = benchmark
- self.track_iou_thr = track_iou_thr
- self.tmp_dir = tempfile.TemporaryDirectory()
- self.tmp_dir.name = get_tmpdir()
- self.seq_info = defaultdict(
- lambda: dict(seq_length=-1, gt_tracks=[], pred_tracks=[]))
- self.gt_dir = self._get_gt_dir()
- self.pred_dir = self._get_pred_dir(outfile_prefix)
- self.seqmap = osp.join(self.pred_dir, 'videoseq.txt')
- with open(self.seqmap, 'w') as f:
- f.write('name\n')
- def __del__(self):
- # To avoid tmpdir being cleaned up too early, because in multiple
- # consecutive ValLoops, the value of `self.tmp_dir.name` is unchanged,
- # and calling `tmp_dir.cleanup()` in compute_metrics will cause errors.
- self.tmp_dir.cleanup()
- def _get_pred_dir(self, outfile_prefix):
- """Get directory to save the prediction results."""
- logger: MMLogger = MMLogger.get_current_instance()
- if outfile_prefix is None:
- outfile_prefix = self.tmp_dir.name
- else:
- if osp.exists(outfile_prefix) and is_main_process():
- logger.info('remove previous results.')
- shutil.rmtree(outfile_prefix)
- pred_dir = osp.join(outfile_prefix, self.TRACKER)
- os.makedirs(pred_dir, exist_ok=True)
- return pred_dir
- def _get_gt_dir(self):
- """Get directory to save the gt files."""
- output_dir = osp.join(self.tmp_dir.name, 'gt')
- os.makedirs(output_dir, exist_ok=True)
- return output_dir
- def transform_gt_and_pred(self, img_data_sample, video, frame_id):
- video = img_data_sample['img_path'].split(os.sep)[-3]
- # load gts
- if 'instances' in img_data_sample:
- gt_instances = img_data_sample['instances']
- gt_tracks = [
- np.array([
- frame_id + 1, gt_instances[i]['instance_id'],
- gt_instances[i]['bbox'][0], gt_instances[i]['bbox'][1],
- gt_instances[i]['bbox'][2] - gt_instances[i]['bbox'][0],
- gt_instances[i]['bbox'][3] - gt_instances[i]['bbox'][1],
- gt_instances[i]['mot_conf'],
- gt_instances[i]['category_id'],
- gt_instances[i]['visibility']
- ]) for i in range(len(gt_instances))
- ]
- self.seq_info[video]['gt_tracks'].extend(gt_tracks)
- # load predictions
- assert 'pred_track_instances' in img_data_sample
- if self.use_postprocess:
- pred_instances = img_data_sample['pred_track_instances']
- pred_tracks = [
- pred_instances['bboxes'][i]
- for i in range(len(pred_instances['bboxes']))
- ]
- else:
- pred_instances = img_data_sample['pred_track_instances']
- pred_tracks = [
- np.array([
- frame_id + 1, pred_instances['instances_id'][i].cpu(),
- pred_instances['bboxes'][i][0].cpu(),
- pred_instances['bboxes'][i][1].cpu(),
- (pred_instances['bboxes'][i][2] -
- pred_instances['bboxes'][i][0]).cpu(),
- (pred_instances['bboxes'][i][3] -
- pred_instances['bboxes'][i][1]).cpu(),
- pred_instances['scores'][i].cpu()
- ]) for i in range(len(pred_instances['instances_id']))
- ]
- self.seq_info[video]['pred_tracks'].extend(pred_tracks)
- def process_image(self, data_samples, video_len):
- img_data_sample = data_samples[0].to_dict()
- video = img_data_sample['img_path'].split(os.sep)[-3]
- frame_id = img_data_sample['frame_id']
- if self.seq_info[video]['seq_length'] == -1:
- self.seq_info[video]['seq_length'] = video_len
- self.transform_gt_and_pred(img_data_sample, video, frame_id)
- if frame_id == video_len - 1:
- # postprocessing
- if self.postprocess_tracklet_cfg:
- info = self.seq_info[video]
- pred_tracks = np.array(info['pred_tracks'])
- for postprocess_tracklet_methods in \
- self.postprocess_tracklet_methods:
- pred_tracks = postprocess_tracklet_methods\
- .forward(pred_tracks)
- info['pred_tracks'] = pred_tracks
- self._save_one_video_gts_preds(video)
- def process_video(self, data_samples):
- video_len = len(data_samples)
- for frame_id in range(video_len):
- img_data_sample = data_samples[frame_id].to_dict()
- # load basic info
- video = img_data_sample['img_path'].split(os.sep)[-3]
- if self.seq_info[video]['seq_length'] == -1:
- self.seq_info[video]['seq_length'] = video_len
- self.transform_gt_and_pred(img_data_sample, video, frame_id)
- if self.postprocess_tracklet_cfg:
- info = self.seq_info[video]
- pred_tracks = np.array(info['pred_tracks'])
- for postprocess_tracklet_methods in \
- self.postprocess_tracklet_methods:
- pred_tracks = postprocess_tracklet_methods \
- .forward(pred_tracks)
- info['pred_tracks'] = pred_tracks
- self._save_one_video_gts_preds(video)
- def _save_one_video_gts_preds(self, seq: str) -> None:
- """Save the gt and prediction results."""
- info = self.seq_info[seq]
- # save predictions
- pred_file = osp.join(self.pred_dir, seq + '.txt')
- pred_tracks = np.array(info['pred_tracks'])
- with open(pred_file, 'wt') as f:
- for tracks in pred_tracks:
- line = '%d,%d,%.3f,%.3f,%.3f,%.3f,%.3f,-1,-1,-1\n' % (
- tracks[0], tracks[1], tracks[2], tracks[3], tracks[4],
- tracks[5], tracks[6])
- f.writelines(line)
- info['pred_tracks'] = []
- # save gts
- if info['gt_tracks']:
- gt_file = osp.join(self.gt_dir, seq + '.txt')
- with open(gt_file, 'wt') as f:
- for tracks in info['gt_tracks']:
- line = '%d,%d,%d,%d,%d,%d,%d,%d,%.5f\n' % (
- tracks[0], tracks[1], tracks[2], tracks[3], tracks[4],
- tracks[5], tracks[6], tracks[7], tracks[8])
- f.writelines(line)
- info['gt_tracks'].clear()
- # save seq info
- with open(self.seqmap, 'a') as f:
- f.write(seq + '\n')
- f.close()
- def compute_metrics(self, results: list = None) -> dict:
- """Compute the metrics from processed results.
- Args:
- results (list): The processed results of each batch.
- Defaults to None.
- Returns:
- dict: The computed metrics. The keys are the names of the metrics,
- and the values are corresponding results.
- """
- logger: MMLogger = MMLogger.get_current_instance()
- # NOTICE: don't access `self.results` from the method.
- eval_results = dict()
- if self.format_only:
- return eval_results
- eval_config = trackeval.Evaluator.get_default_eval_config()
- # need to split out the tracker name
- # caused by the implementation of TrackEval
- pred_dir_tmp = self.pred_dir.rsplit(osp.sep, 1)[0]
- dataset_config = self.get_dataset_cfg(self.gt_dir, pred_dir_tmp)
- evaluator = trackeval.Evaluator(eval_config)
- dataset = [trackeval.datasets.MotChallenge2DBox(dataset_config)]
- metrics = [
- getattr(trackeval.metrics,
- metric)(dict(METRICS=[metric], THRESHOLD=0.5))
- for metric in self.metrics
- ]
- output_res, _ = evaluator.evaluate(dataset, metrics)
- output_res = output_res['MotChallenge2DBox'][
- self.TRACKER]['COMBINED_SEQ']['pedestrian']
- if 'HOTA' in self.metrics:
- logger.info('Evaluating HOTA Metrics...')
- eval_results['HOTA'] = np.average(output_res['HOTA']['HOTA'])
- eval_results['AssA'] = np.average(output_res['HOTA']['AssA'])
- eval_results['DetA'] = np.average(output_res['HOTA']['DetA'])
- if 'CLEAR' in self.metrics:
- logger.info('Evaluating CLEAR Metrics...')
- eval_results['MOTA'] = np.average(output_res['CLEAR']['MOTA'])
- eval_results['MOTP'] = np.average(output_res['CLEAR']['MOTP'])
- eval_results['IDSW'] = np.average(output_res['CLEAR']['IDSW'])
- eval_results['TP'] = np.average(output_res['CLEAR']['CLR_TP'])
- eval_results['FP'] = np.average(output_res['CLEAR']['CLR_FP'])
- eval_results['FN'] = np.average(output_res['CLEAR']['CLR_FN'])
- eval_results['Frag'] = np.average(output_res['CLEAR']['Frag'])
- eval_results['MT'] = np.average(output_res['CLEAR']['MT'])
- eval_results['ML'] = np.average(output_res['CLEAR']['ML'])
- if 'Identity' in self.metrics:
- logger.info('Evaluating Identity Metrics...')
- eval_results['IDF1'] = np.average(output_res['Identity']['IDF1'])
- eval_results['IDTP'] = np.average(output_res['Identity']['IDTP'])
- eval_results['IDFN'] = np.average(output_res['Identity']['IDFN'])
- eval_results['IDFP'] = np.average(output_res['Identity']['IDFP'])
- eval_results['IDP'] = np.average(output_res['Identity']['IDP'])
- eval_results['IDR'] = np.average(output_res['Identity']['IDR'])
- return eval_results
- def evaluate(self, size: int = 1) -> dict:
- """Evaluate the model performance of the whole dataset after processing
- all batches.
- Args:
- size (int): Length of the entire validation dataset.
- Defaults to None.
- Returns:
- dict: Evaluation metrics dict on the val dataset. The keys are the
- names of the metrics, and the values are corresponding results.
- """
- # wait for all processes to complete prediction.
- barrier()
- # gather seq_info and convert the list of dict to a dict.
- # convert self.seq_info to dict first to make it picklable.
- gathered_seq_info = all_gather_object(dict(self.seq_info))
- all_seq_info = dict()
- for _seq_info in gathered_seq_info:
- all_seq_info.update(_seq_info)
- self.seq_info = all_seq_info
- if is_main_process():
- _metrics = self.compute_metrics() # type: ignore
- # Add prefix to metric names
- if self.prefix:
- _metrics = {
- '/'.join((self.prefix, k)): v
- for k, v in _metrics.items()
- }
- metrics = [_metrics]
- else:
- metrics = [None] # type: ignore
- broadcast_object_list(metrics)
- # reset the results list
- self.results.clear()
- return metrics[0]
- def get_dataset_cfg(self, gt_folder: str, tracker_folder: str):
- """Get default configs for trackeval.datasets.MotChallenge2DBox.
- Args:
- gt_folder (str): the name of the GT folder
- tracker_folder (str): the name of the tracker folder
- Returns:
- Dataset Configs for MotChallenge2DBox.
- """
- dataset_config = dict(
- # Location of GT data
- GT_FOLDER=gt_folder,
- # Trackers location
- TRACKERS_FOLDER=tracker_folder,
- # Where to save eval results
- # (if None, same as TRACKERS_FOLDER)
- OUTPUT_FOLDER=None,
- # Use self.TRACKER as the default tracker
- TRACKERS_TO_EVAL=[self.TRACKER],
- # Option values: ['pedestrian']
- CLASSES_TO_EVAL=['pedestrian'],
- # Option Values: 'MOT15', 'MOT16', 'MOT17', 'MOT20', 'DanceTrack'
- BENCHMARK=self.benchmark,
- # Option Values: 'train', 'test'
- SPLIT_TO_EVAL='val' if self.benchmark == 'DanceTrack' else 'train',
- # Whether tracker input files are zipped
- INPUT_AS_ZIP=False,
- # Whether to print current config
- PRINT_CONFIG=True,
- # Whether to perform preprocessing
- # (never done for MOT15)
- DO_PREPROC=False if self.benchmark == 'MOT15' else True,
- # Tracker files are in
- # TRACKER_FOLDER/tracker_name/TRACKER_SUB_FOLDER
- TRACKER_SUB_FOLDER='',
- # Output files are saved in
- # OUTPUT_FOLDER/tracker_name/OUTPUT_SUB_FOLDER
- OUTPUT_SUB_FOLDER='',
- # Names of trackers to display
- # (if None: TRACKERS_TO_EVAL)
- TRACKER_DISPLAY_NAMES=None,
- # Where seqmaps are found
- # (if None: GT_FOLDER/seqmaps)
- SEQMAP_FOLDER=None,
- # Directly specify seqmap file
- # (if none use seqmap_folder/benchmark-split_to_eval)
- SEQMAP_FILE=self.seqmap,
- # If not None, specify sequences to eval
- # and their number of timesteps
- SEQ_INFO={
- seq: info['seq_length']
- for seq, info in self.seq_info.items()
- },
- # '{gt_folder}/{seq}.txt'
- GT_LOC_FORMAT='{gt_folder}/{seq}.txt',
- # If False, data is in GT_FOLDER/BENCHMARK-SPLIT_TO_EVAL/ and in
- # TRACKERS_FOLDER/BENCHMARK-SPLIT_TO_EVAL/tracker/
- # If True, the middle 'benchmark-split' folder is skipped for both.
- SKIP_SPLIT_FOL=True,
- )
- return dataset_config
|