123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- # Copyright (c) OpenMMLab. All rights reserved.
- import os.path as osp
- import pickle
- import shutil
- import tempfile
- import warnings
- from typing import Optional, Sequence
- import torch
- from mmengine.dist import (barrier, broadcast, broadcast_object_list,
- get_dist_info, is_main_process)
- from mmengine.evaluator import BaseMetric
- from mmengine.utils import mkdir_or_exist
- class BaseVideoMetric(BaseMetric):
- """Base class for a metric in video task.
- The metric first processes each batch of data_samples and predictions,
- and appends the processed results to the results list. Then it
- collects all results together from all ranks if distributed training
- is used. Finally, it computes the metrics of the entire dataset.
- A subclass of class:`BaseVideoMetric` should assign a meaningful value
- to the class attribute `default_prefix`. See the argument `prefix` for
- details.
- """
- def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
- """Process one batch of data samples and predictions.
- The processed results should be stored in ``self.results``, which will
- be used to compute the metrics when all batches have been processed.
- Args:
- data_batch (dict): A batch of data from the dataloader.
- data_samples (Sequence[dict]): A batch of data samples that
- contain annotations and predictions.
- """
- for track_data_sample in data_samples:
- video_data_samples = track_data_sample['video_data_samples']
- ori_video_len = video_data_samples[0].ori_video_length
- if ori_video_len == len(video_data_samples):
- # video process
- self.process_video(video_data_samples)
- else:
- # image process
- self.process_image(video_data_samples, ori_video_len)
- def evaluate(self, size: int = 1) -> dict:
- """Evaluate the model performance of the whole dataset after processing
- all batches.
- Args:
- size (int): Length of the entire validation dataset.
- Returns:
- dict: Evaluation metrics dict on the val dataset. The keys are the
- names of the metrics, and the values are corresponding results.
- """
- if len(self.results) == 0:
- warnings.warn(
- f'{self.__class__.__name__} got empty `self.results`. Please '
- 'ensure that the processed results are properly added into '
- '`self.results` in `process` method.')
- results = collect_tracking_results(self.results, self.collect_device)
- if is_main_process():
- _metrics = self.compute_metrics(results) # type: ignore
- # Add prefix to metric names
- if self.prefix:
- _metrics = {
- '/'.join((self.prefix, k)): v
- for k, v in _metrics.items()
- }
- metrics = [_metrics]
- else:
- metrics = [None] # type: ignore
- broadcast_object_list(metrics)
- # reset the results list
- self.results.clear()
- return metrics[0]
- def collect_tracking_results(results: list,
- device: str = 'cpu',
- tmpdir: Optional[str] = None) -> Optional[list]:
- """Collected results in distributed environments. different from the
- function mmengine.dist.collect_results, tracking compute metrics don't use
- paramenter size, which means length of the entire validation dataset.
- because it's equal to video num, but compute metrics need image num.
- Args:
- results (list): Result list containing result parts to be
- collected. Each item of ``result_part`` should be a picklable
- object.
- device (str): Device name. Optional values are 'cpu' and 'gpu'.
- tmpdir (str | None): Temporal directory for collected results to
- store. If set to None, it will create a temporal directory for it.
- ``tmpdir`` should be None when device is 'gpu'. Defaults to None.
- Returns:
- list or None: The collected results.
- """
- if device not in ['gpu', 'cpu']:
- raise NotImplementedError(
- f"device must be 'cpu' or 'gpu', but got {device}")
- if device == 'gpu':
- assert tmpdir is None, 'tmpdir should be None when device is "gpu"'
- raise NotImplementedError('GPU collecting has not been supported yet')
- else:
- return collect_tracking_results_cpu(results, tmpdir)
- def collect_tracking_results_cpu(result_part: list,
- tmpdir: Optional[str] = None
- ) -> Optional[list]:
- """Collect results on cpu mode.
- Saves the results on different gpus to 'tmpdir' and collects them by the
- rank 0 worker.
- Args:
- result_part (list): The part of prediction results.
- tmpdir (str): Path of directory to save the temporary results from
- different gpus under cpu mode. If is None, use `tempfile.mkdtemp()`
- to make a temporary path. Defaults to None.
- Returns:
- list or None: The collected results.
- """
- rank, world_size = get_dist_info()
- if world_size == 1:
- return result_part
- # create a tmp dir if it is not specified
- if tmpdir is None:
- MAX_LEN = 512
- # 32 is whitespace
- dir_tensor = torch.full((MAX_LEN, ), 32, dtype=torch.uint8)
- if rank == 0:
- mkdir_or_exist('.dist_test')
- tmpdir = tempfile.mkdtemp(dir='.dist_test')
- tmpdir = torch.tensor(
- bytearray(tmpdir.encode()), dtype=torch.uint8)
- dir_tensor[:len(tmpdir)] = tmpdir
- broadcast(dir_tensor, 0)
- tmpdir = dir_tensor.numpy().tobytes().decode().rstrip()
- else:
- mkdir_or_exist(tmpdir)
- # dump the part result to the dir
- with open(osp.join(tmpdir, f'part_{rank}.pkl'), 'wb') as f: # type: ignore
- pickle.dump(result_part, f, protocol=2)
- barrier()
- # collect all parts
- if rank != 0:
- return None
- else:
- # load results of all parts from tmp dir
- part_list = []
- for i in range(world_size):
- path = osp.join(tmpdir, f'part_{i}.pkl') # type: ignore
- with open(path, 'rb') as f:
- part_list.extend(pickle.load(f))
- shutil.rmtree(tmpdir)
- return part_list
|