|
@@ -0,0 +1,145 @@
|
|
|
|
+import collections
|
|
|
|
+import threading
|
|
|
|
+import numpy as np
|
|
|
|
+import openvino as ov
|
|
|
|
+import os.path as osp
|
|
|
|
+import cv2
|
|
|
|
+from ..logger import logger
|
|
|
|
+from . import _utils
|
|
|
|
+from labelme.utils import img_qt_to_arr
|
|
|
|
+from qtpy import QtGui
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class Normalize:
|
|
|
|
+ def __init__(self, mean=(0.5,), std=(0.5,)):
|
|
|
|
+ if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
|
|
|
|
+ raise ValueError("mean and std should be of type list or tuple.")
|
|
|
|
+ self.mean = np.array(mean, dtype=np.float32)
|
|
|
|
+ self.std = np.array(std, dtype=np.float32)
|
|
|
|
+
|
|
|
|
+ if np.any(self.std == 0):
|
|
|
|
+ raise ValueError("std should not contain zero values.")
|
|
|
|
+
|
|
|
|
+ def __call__(self, img):
|
|
|
|
+ img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
|
|
|
|
+ img = (img - self.mean) / self.std # Normalize
|
|
|
|
+ return img
|
|
|
|
+
|
|
|
|
+class BarcodePredictModel:
|
|
|
|
+ def __init__(self, detection_model_path, segmentation_model_path=None):
|
|
|
|
+ self.ie = ov.Core()
|
|
|
|
+
|
|
|
|
+ # Load detection model
|
|
|
|
+ self.detection_net = self.ie.read_model(model=detection_model_path)
|
|
|
|
+ self.detection_sess = self.ie.compile_model(model=self.detection_net, device_name="CPU")
|
|
|
|
+ self.detection_request = self.detection_sess.create_infer_request()
|
|
|
|
+ # Load segmentation model if provided
|
|
|
|
+ self.segmentation_net = None
|
|
|
|
+ self.segmentation_sess = None
|
|
|
|
+ if segmentation_model_path:
|
|
|
|
+ self.segmentation_net = self.ie.read_model(model=segmentation_model_path)
|
|
|
|
+ self.segmentation_sess = self.ie.compile_model(model=self.segmentation_net, device_name="CPU")
|
|
|
|
+
|
|
|
|
+ self._lock = threading.Lock()
|
|
|
|
+ self.input_height = 640 # Input shape for detection model (example size)
|
|
|
|
+ self.input_width = 640
|
|
|
|
+ self.segmentation_input_shape = (1, 3, 128, 256) # Input shape for segmentation model
|
|
|
|
+ self._image_embedding_cache = collections.OrderedDict()
|
|
|
|
+ self._max_cache_size = 10
|
|
|
|
+ self.normalize = Normalize() # Normalization instance
|
|
|
|
+
|
|
|
|
+ def set_image(self, image: np.ndarray):
|
|
|
|
+ with self._lock:
|
|
|
|
+ self.raw_width = image.shape[1]
|
|
|
|
+ self.raw_height = image.shape[0]
|
|
|
|
+ # Preprocess the image
|
|
|
|
+ input_tensor = self.preprocess_image(image)
|
|
|
|
+ self._image = input_tensor
|
|
|
|
+ # Prepare other inputs
|
|
|
|
+ self._im_shape = np.array([[self.raw_height, self.raw_width]], dtype=np.float32)
|
|
|
|
+ self._scale_factor = np.array([[1.0, 1.0]], dtype=np.float32)
|
|
|
|
+ self._thread = threading.Thread(
|
|
|
|
+ target=self._compute_and_cache_image_embedding
|
|
|
|
+ )
|
|
|
|
+ self._thread.start()
|
|
|
|
+
|
|
|
|
+ def preprocess_image(self, image, for_segmentation=False):
|
|
|
|
+ if for_segmentation:
|
|
|
|
+ # Resize image to segmentation model input size
|
|
|
|
+ # logger.debug(f"Preprocessing image for segmentation: {image.shape}")
|
|
|
|
+ resized_image = cv2.resize(image, (self.segmentation_input_shape[3], self.segmentation_input_shape[2])) # Width, Height
|
|
|
|
+ resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
|
|
|
|
+ resized_image = self.normalize(resized_image) # Normalize for segmentation model
|
|
|
|
+ else:
|
|
|
|
+ # Resize image for detection model input size
|
|
|
|
+ logger.debug(f"Preprocessing image for detection: {image.shape}")
|
|
|
|
+ resized_image = cv2.resize(image, (self.input_width, self.input_height))
|
|
|
|
+ resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
|
|
|
|
+ resized_image = resized_image.astype('float32') / 255.0
|
|
|
|
+
|
|
|
|
+ input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
|
|
|
|
+ input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
|
|
|
|
+ logger.debug(f"Processed image shape: {input_tensor.shape}")
|
|
|
|
+ return input_tensor
|
|
|
|
+
|
|
|
|
+ def _compute_and_cache_image_embedding(self):
|
|
|
|
+ with self._lock:
|
|
|
|
+ # Prepare the inputs dictionary
|
|
|
|
+ inputs = {
|
|
|
|
+ 'image': self._image,
|
|
|
|
+ 'im_shape': self._im_shape,
|
|
|
|
+ 'scale_factor': self._scale_factor
|
|
|
|
+ }
|
|
|
|
+ # Perform inference
|
|
|
|
+ self._result = self.detection_request.infer(inputs)
|
|
|
|
+ # print("models results:", self._result)
|
|
|
|
+
|
|
|
|
+ def _get_image_embedding(self):
|
|
|
|
+ if self._thread is not None:
|
|
|
|
+ self._thread.join()
|
|
|
|
+ self._thread = None
|
|
|
|
+ with self._lock:
|
|
|
|
+ new_result = self._result
|
|
|
|
+ return new_result
|
|
|
|
+
|
|
|
|
+ def predict_mask_from_points(self,points=None,point_labels=None):
|
|
|
|
+ return _collect_result_from_output(
|
|
|
|
+ outputs=self._get_image_embedding(),
|
|
|
|
+ raw_width=self.raw_width,
|
|
|
|
+ raw_height=self.raw_height,
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ def predict_polygon_from_points(self,points=None,point_labels=None):
|
|
|
|
+ result_list=self.predict_mask_from_points(points,point_labels)
|
|
|
|
+ return result_list
|
|
|
|
+
|
|
|
|
+def _collect_result_from_output(outputs, raw_width, raw_height):
|
|
|
|
+ # Extract the desired output array from outputs dictionary
|
|
|
|
+ output_array = None
|
|
|
|
+ for key in outputs:
|
|
|
|
+ if 'save_infer_model/scale_0.tmp_0' in key.names:
|
|
|
|
+ output_array = outputs[key]
|
|
|
|
+ break
|
|
|
|
+ if output_array is None:
|
|
|
|
+ raise ValueError("Desired output not found in outputs")
|
|
|
|
+
|
|
|
|
+ outputs = output_array # shape [50,6]
|
|
|
|
+ point_list = []
|
|
|
|
+ thresh_hold = 0.7
|
|
|
|
+
|
|
|
|
+ for bbox_info in outputs:
|
|
|
|
+ score = bbox_info[1]
|
|
|
|
+ if score > thresh_hold:
|
|
|
|
+ x1_raw = bbox_info[2]
|
|
|
|
+ y1_raw = bbox_info[3]
|
|
|
|
+ x2_raw = bbox_info[4]
|
|
|
|
+ y2_raw = bbox_info[5]
|
|
|
|
+ print(f"Raw bbox coordinates: x1={x1_raw}, y1={y1_raw}, x2={x2_raw}, y2={y2_raw}")
|
|
|
|
+ x1 = max(min(int(x1_raw), raw_width - 1), 0)
|
|
|
|
+ y1 = max(min(int(y1_raw), raw_height - 1), 0)
|
|
|
|
+ x2 = max(min(int(x2_raw), raw_width - 1), 0)
|
|
|
|
+ y2 = max(min(int(y2_raw), raw_height - 1), 0)
|
|
|
|
+ print(f"Clamped bbox coordinates: x1={x1}, y1={y1}, x2={x2}, y2={y2}")
|
|
|
|
+ point_xy = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
|
|
|
|
+ point_list.append(point_xy)
|
|
|
|
+ return point_list
|