barcode_detect.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import collections
  2. import threading
  3. import numpy as np
  4. import openvino as ov
  5. import os.path as osp
  6. import cv2
  7. from ..logger import logger
  8. from . import _utils
  9. from labelme.utils import img_qt_to_arr
  10. from qtpy import QtGui
  11. class Normalize:
  12. def __init__(self, mean=(0.15525904, 0.15525904, 0.15525904), std=(0.12552188, 0.12552188, 0.12552188)):
  13. if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
  14. raise ValueError("mean and std should be of type list or tuple.")
  15. self.mean = np.array(mean, dtype=np.float32)
  16. self.std = np.array(std, dtype=np.float32)
  17. # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
  18. self.mean = self.mean.reshape((1, 1, 3))
  19. self.std = self.std.reshape((1, 1, 3))
  20. def __call__(self, img):
  21. img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
  22. img = (img - self.mean) / self.std # Normalize
  23. return img
  24. class BarcodeDetectModel:
  25. def __init__(self, detection_model_path, segmentation_model_path=None):
  26. self.ie = ov.Core()
  27. # Load detection model
  28. self.detection_net = self.ie.read_model(model=detection_model_path)
  29. self.detection_sess = self.ie.compile_model(model=self.detection_net, device_name="CPU")
  30. self.detection_request = self.detection_sess.create_infer_request()
  31. # Load segmentation model if provided
  32. self.segmentation_net = None
  33. self.segmentation_sess = None
  34. if segmentation_model_path:
  35. self.segmentation_net = self.ie.read_model(model=segmentation_model_path)
  36. self.segmentation_sess = self.ie.compile_model(model=self.segmentation_net, device_name="CPU")
  37. self._lock = threading.Lock()
  38. self.input_height = 640 # Input shape for detection model (example size)
  39. self.input_width = 640
  40. self.segmentation_input_shape = (1, 3, 128, 256) # Input shape for segmentation model
  41. self._image_embedding_cache = collections.OrderedDict()
  42. self._max_cache_size = 10
  43. self.normalize = Normalize() # Normalization instance
  44. def set_image(self, image: np.ndarray):
  45. with self._lock:
  46. self.raw_width = image.shape[1]
  47. self.raw_height = image.shape[0]
  48. # Preprocess the image
  49. input_tensor = self.preprocess_image(image)
  50. self._image = input_tensor
  51. # Prepare other inputs
  52. self._im_shape = np.array([[self.raw_height, self.raw_width]], dtype=np.float32)
  53. self._scale_factor = np.array([[1.0, 1.0]], dtype=np.float32)
  54. self._thread = threading.Thread(
  55. target=self._compute_and_cache_image_embedding
  56. )
  57. self._thread.start()
  58. def preprocess_image(self, image, for_segmentation=False):
  59. if for_segmentation:
  60. # Resize image to segmentation model input size
  61. logger.debug(f"Preprocessing image for segmentation: {image.shape}")
  62. norm = Normalize(mean=(0.447365,0.447365,0.447365), std=(0.17667491,0.17667491,0.17667491))
  63. resized_image = cv2.resize(image, (self.segmentation_input_shape[3], self.segmentation_input_shape[2])) # Width, Height
  64. resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
  65. resized_image = norm(resized_image) # Normalize for segmentation model
  66. else:
  67. # Resize image for detection model input size
  68. logger.debug(f"Preprocessing image for detection: {image.shape}")
  69. resized_image = cv2.resize(image, (self.input_width, self.input_height))
  70. resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
  71. resized_image = self.normalize(resized_image)
  72. # resized_image = resized_image.astype('float32') / 255.0
  73. input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
  74. input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
  75. # logger.debug(f"Processed image shape: {input_tensor.shape}")
  76. return input_tensor
  77. def _compute_and_cache_image_embedding(self):
  78. with self._lock:
  79. # Prepare the inputs dictionary
  80. inputs = {
  81. 'image': self._image,
  82. 'im_shape': self._im_shape,
  83. 'scale_factor': self._scale_factor
  84. }
  85. # Perform inference
  86. self._result = self.detection_request.infer(inputs)
  87. # print("models results:", self._result)
  88. def _get_image_embedding(self):
  89. if self._thread is not None:
  90. self._thread.join()
  91. self._thread = None
  92. with self._lock:
  93. new_result = self._result
  94. return new_result
  95. def predict_mask_from_points(self,points=None,point_labels=None):
  96. return self._collect_result_from_output(
  97. outputs=self._get_image_embedding(),
  98. raw_width=self.raw_width,
  99. raw_height=self.raw_height,
  100. )
  101. def predict_polygon_from_points(self,points=None,point_labels=None):
  102. result_list=self.predict_mask_from_points(points,point_labels)
  103. return result_list
  104. def _collect_result_from_output(self, outputs, raw_width, raw_height):
  105. # Extract the desired output array from outputs dictionary
  106. output_array = None
  107. for key in outputs:
  108. if 'save_infer_model/scale_0.tmp_0' in key.names:
  109. output_array = outputs[key]
  110. break
  111. if output_array is None:
  112. raise ValueError("Desired output not found in outputs")
  113. outputs = output_array # shape [50,6]
  114. point_list = []
  115. thresh_hold = 0.7
  116. for bbox_info in outputs:
  117. score = bbox_info[1]
  118. if score > thresh_hold:
  119. x1_raw = bbox_info[2]
  120. y1_raw = bbox_info[3]
  121. x2_raw = bbox_info[4]
  122. y2_raw = bbox_info[5]
  123. # print(f"Raw bbox coordinates: x1={x1_raw}, y1={y1_raw}, x2={x2_raw}, y2={y2_raw}")
  124. x1 = max(min(int(x1_raw), raw_width - 1), 0)
  125. y1 = max(min(int(y1_raw), raw_height - 1), 0)
  126. x2 = max(min(int(x2_raw), raw_width - 1), 0)
  127. y2 = max(min(int(y2_raw), raw_height - 1), 0)
  128. # print(f"Clamped bbox coordinates: x1={x1}, y1={y1}, x2={x2}, y2={y2}")
  129. point_xy = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
  130. point_list.append(point_xy)
  131. return point_list