barcode_detect.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import collections
  2. import threading
  3. import numpy as np
  4. import openvino as ov
  5. import os.path as osp
  6. import cv2
  7. from ..logger import logger
  8. from . import _utils
  9. from labelme.utils import img_qt_to_arr
  10. from qtpy import QtGui
  11. class Normalize:
  12. def __init__(self, mean=(0., 0., 0.), std=(0.1, 0.1, 0.1)):
  13. if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
  14. raise ValueError("mean and std should be of type list or tuple.")
  15. self.mean = np.array(mean, dtype=np.float32)
  16. self.std = np.array(std, dtype=np.float32)
  17. self.mean = self.mean.reshape((1, 1, 3))
  18. self.std = self.std.reshape((1, 1, 3))
  19. def __call__(self, img):
  20. img = img.astype(np.float32) / 255.0
  21. img = (img - self.mean) / self.std
  22. return img
  23. class BarcodeDetectModel:
  24. def __init__(self, detection_model_path, segmentation_model_path=None):
  25. self.ie = ov.Core()
  26. self.detection_net = self.ie.read_model(model=detection_model_path)
  27. self.detection_sess = self.ie.compile_model(self.detection_net, "CPU")
  28. self.detection_request = self.detection_sess.create_infer_request()
  29. self.segmentation_net = None
  30. self.segmentation_sess = None
  31. if segmentation_model_path:
  32. self.segmentation_net = self.ie.read_model(model=segmentation_model_path)
  33. self.segmentation_sess = self.ie.compile_model(self.segmentation_net, "CPU")
  34. self._lock = threading.Lock()
  35. self.input_height = 1024
  36. self.input_width = 1024
  37. self.segmentation_input_shape = (1, 1, 192, 320)
  38. self._image_embedding_cache = collections.OrderedDict()
  39. self._max_cache_size = 10
  40. self.normalize = Normalize()
  41. def set_image(self, image: np.ndarray):
  42. with self._lock:
  43. self.raw_width = image.shape[1]
  44. self.raw_height = image.shape[0]
  45. input_tensor = self.preprocess_image(image)
  46. self._image = input_tensor
  47. self._thread = threading.Thread(
  48. target=self._compute_and_cache_image_embedding
  49. )
  50. self._thread.start()
  51. def preprocess_image(self, image, for_segmentation=False):
  52. if for_segmentation:
  53. logger.debug(f"Preprocessing image for segmentation: {image.shape}")
  54. norm = Normalize(mean=(0.5,0.5,0.5), std=(0.5,0.5,0.5))
  55. resized_image = cv2.resize(
  56. image,
  57. (self.segmentation_input_shape[3], self.segmentation_input_shape[2])
  58. )
  59. resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
  60. resized_image = norm(resized_image)
  61. else:
  62. logger.debug(f"Preprocessing image for detection: {image.shape}")
  63. h, w = image.shape[:2]
  64. r = min(self.input_height / h, self.input_width / w)
  65. nh, nw = int(round(h * r)), int(round(w * r))
  66. image_resized = cv2.resize(image, (nw, nh), interpolation=cv2.INTER_LINEAR)
  67. top = (self.input_height - nh) // 2
  68. bottom = self.input_height - nh - top
  69. left = (self.input_width - nw) // 2
  70. right = self.input_width - nw - left
  71. image_padded = cv2.copyMakeBorder(
  72. image_resized,
  73. top, bottom, left, right,
  74. cv2.BORDER_CONSTANT,
  75. value=(114,114,114)
  76. )
  77. # store for scaling
  78. self._letterbox_gain = r
  79. self._letterbox_pad = (left, top)
  80. resized_image = cv2.cvtColor(image_padded, cv2.COLOR_BGR2RGB)
  81. resized_image = resized_image.astype(np.float32) / 255.0
  82. input_tensor = resized_image.transpose(2, 0, 1)
  83. input_tensor = np.expand_dims(input_tensor, 0)
  84. return input_tensor
  85. def _compute_and_cache_image_embedding(self):
  86. with self._lock:
  87. inputs = [self._image]
  88. self._result = self.detection_request.infer(inputs)
  89. def _get_image_embedding(self):
  90. if self._thread is not None:
  91. self._thread.join()
  92. self._thread = None
  93. with self._lock:
  94. return self._result
  95. def predict_mask_from_points(self,points=None,point_labels=None):
  96. return self._collect_result_from_output(
  97. outputs=self._get_image_embedding(),
  98. raw_width=self.raw_width,
  99. raw_height=self.raw_height,
  100. )
  101. def predict_polygon_from_points(self,points=None,point_labels=None):
  102. result_list=self.predict_mask_from_points(points,point_labels)
  103. return result_list
  104. def _collect_result_from_output(self, outputs, raw_width, raw_height):
  105. output_array = list(outputs.values())[0]
  106. outputs = output_array[0] # (300,6)
  107. point_list = []
  108. thresh_hold = 0.5
  109. gain = self._letterbox_gain
  110. pad_x, pad_y = self._letterbox_pad
  111. for bbox_info in outputs:
  112. score = float(bbox_info[4])
  113. if score > thresh_hold:
  114. x1_raw = bbox_info[0]
  115. y1_raw = bbox_info[1]
  116. x2_raw = bbox_info[2]
  117. y2_raw = bbox_info[3]
  118. x1 = (x1_raw - pad_x) / gain
  119. y1 = (y1_raw - pad_y) / gain
  120. x2 = (x2_raw - pad_x) / gain
  121. y2 = (y2_raw - pad_y) / gain
  122. x1 = max(min(int(x1), raw_width - 1), 0)
  123. y1 = max(min(int(y1), raw_height - 1), 0)
  124. x2 = max(min(int(x2), raw_width - 1), 0)
  125. y2 = max(min(int(y2), raw_height - 1), 0)
  126. point_xy = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
  127. point_list.append(point_xy)
  128. return point_list