import numpy as np
import pytest
import torch

from mmdet.utils import AvoidOOM
from mmdet.utils.memory import cast_tensor_type


def test_avoidoom():
    tensor = torch.from_numpy(np.random.random((20, 20)))
    if torch.cuda.is_available():
        tensor = tensor.cuda()
        # get default result
        default_result = torch.mm(tensor, tensor.transpose(1, 0))

        # when not occurred OOM error
        AvoidCudaOOM = AvoidOOM()
        result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
                                                          tensor.transpose(
                                                              1, 0))
        assert default_result.device == result.device and \
               default_result.dtype == result.dtype and \
               torch.equal(default_result, result)

        # calculate with fp16 and convert back to source type
        AvoidCudaOOM = AvoidOOM(test=True)
        result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
                                                          tensor.transpose(
                                                              1, 0))
        assert default_result.device == result.device and \
               default_result.dtype == result.dtype and \
               torch.allclose(default_result, result, 1e-3)

        # calculate on cpu and convert back to source device
        AvoidCudaOOM = AvoidOOM(test=True)
        result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
                                                          tensor.transpose(
                                                              1, 0))
        assert result.dtype == default_result.dtype and \
               result.device == default_result.device and \
               torch.allclose(default_result, result)

        # do not calculate on cpu and the outputs will be same as input
        AvoidCudaOOM = AvoidOOM(test=True, to_cpu=False)
        result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
                                                          tensor.transpose(
                                                              1, 0))
        assert result.dtype == default_result.dtype and \
               result.device == default_result.device

    else:
        default_result = torch.mm(tensor, tensor.transpose(1, 0))
        AvoidCudaOOM = AvoidOOM()
        result = AvoidCudaOOM.retry_if_cuda_oom(torch.mm)(tensor,
                                                          tensor.transpose(
                                                              1, 0))
        assert default_result.device == result.device and \
               default_result.dtype == result.dtype and \
               torch.equal(default_result, result)


def test_cast_tensor_type():
    inputs = torch.rand(10)
    if torch.cuda.is_available():
        inputs = inputs.cuda()
    with pytest.raises(AssertionError):
        cast_tensor_type(inputs, src_type=None, dst_type=None)
    # input is a float
    out = cast_tensor_type(10., dst_type=torch.half)
    assert out == 10. and isinstance(out, float)
    # convert Tensor to fp16 and re-convert to fp32
    fp16_out = cast_tensor_type(inputs, dst_type=torch.half)
    assert fp16_out.dtype == torch.half
    fp32_out = cast_tensor_type(fp16_out, dst_type=torch.float32)
    assert fp32_out.dtype == torch.float32

    # input is a list
    list_input = [inputs, inputs]
    list_outs = cast_tensor_type(list_input, dst_type=torch.half)
    assert len(list_outs) == len(list_input) and \
           isinstance(list_outs, list)
    for out in list_outs:
        assert out.dtype == torch.half
    # input is a dict
    dict_input = {'test1': inputs, 'test2': inputs}
    dict_outs = cast_tensor_type(dict_input, dst_type=torch.half)
    assert len(dict_outs) == len(dict_input) and \
           isinstance(dict_outs, dict)

    # convert the input tensor to CPU and re-convert to GPU
    if torch.cuda.is_available():
        cpu_device = torch.empty(0).device
        gpu_device = inputs.device
        cpu_out = cast_tensor_type(inputs, dst_type=cpu_device)
        assert cpu_out.device == cpu_device

        gpu_out = cast_tensor_type(inputs, dst_type=gpu_device)
        assert gpu_out.device == gpu_device