import collections import threading import numpy as np import openvino as ov import os.path as osp import cv2 from ..logger import logger from . import _utils from labelme.utils import img_qt_to_arr from qtpy import QtGui class Normalize: def __init__(self, mean=(0.5,), std=(0.5,)): if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))): raise ValueError("mean and std should be of type list or tuple.") self.mean = np.array(mean, dtype=np.float32) self.std = np.array(std, dtype=np.float32) if np.any(self.std == 0): raise ValueError("std should not contain zero values.") def __call__(self, img): img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1] img = (img - self.mean) / self.std # Normalize return img class BarcodePredictModel: def __init__(self, detection_model_path, segmentation_model_path=None): self.ie = ov.Core() # Load detection model self.detection_net = self.ie.read_model(model=detection_model_path) self.detection_sess = self.ie.compile_model(model=self.detection_net, device_name="CPU") self.detection_request = self.detection_sess.create_infer_request() # Load segmentation model if provided self.segmentation_net = None self.segmentation_sess = None if segmentation_model_path: self.segmentation_net = self.ie.read_model(model=segmentation_model_path) self.segmentation_sess = self.ie.compile_model(model=self.segmentation_net, device_name="CPU") self._lock = threading.Lock() self.input_height = 640 # Input shape for detection model (example size) self.input_width = 640 self.segmentation_input_shape = (1, 3, 128, 256) # Input shape for segmentation model self._image_embedding_cache = collections.OrderedDict() self._max_cache_size = 10 self.normalize = Normalize() # Normalization instance def set_image(self, image: np.ndarray): with self._lock: self.raw_width = image.shape[1] self.raw_height = image.shape[0] # Preprocess the image input_tensor = self.preprocess_image(image) self._image = input_tensor # Prepare other inputs self._im_shape = np.array([[self.raw_height, self.raw_width]], dtype=np.float32) self._scale_factor = np.array([[1.0, 1.0]], dtype=np.float32) self._thread = threading.Thread( target=self._compute_and_cache_image_embedding ) self._thread.start() def preprocess_image(self, image, for_segmentation=False): if for_segmentation: # Resize image to segmentation model input size # logger.debug(f"Preprocessing image for segmentation: {image.shape}") resized_image = cv2.resize(image, (self.segmentation_input_shape[3], self.segmentation_input_shape[2])) # Width, Height resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB) resized_image = self.normalize(resized_image) # Normalize for segmentation model else: # Resize image for detection model input size logger.debug(f"Preprocessing image for detection: {image.shape}") resized_image = cv2.resize(image, (self.input_width, self.input_height)) resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB) resized_image = resized_image.astype('float32') / 255.0 input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension logger.debug(f"Processed image shape: {input_tensor.shape}") return input_tensor def _compute_and_cache_image_embedding(self): with self._lock: # Prepare the inputs dictionary inputs = { 'image': self._image, 'im_shape': self._im_shape, 'scale_factor': self._scale_factor } # Perform inference self._result = self.detection_request.infer(inputs) # print("models results:", self._result) def _get_image_embedding(self): if self._thread is not None: self._thread.join() self._thread = None with self._lock: new_result = self._result return new_result def predict_mask_from_points(self,points=None,point_labels=None): return _collect_result_from_output( outputs=self._get_image_embedding(), raw_width=self.raw_width, raw_height=self.raw_height, ) def predict_polygon_from_points(self,points=None,point_labels=None): result_list=self.predict_mask_from_points(points,point_labels) return result_list def _collect_result_from_output(outputs, raw_width, raw_height): # Extract the desired output array from outputs dictionary output_array = None for key in outputs: if 'save_infer_model/scale_0.tmp_0' in key.names: output_array = outputs[key] break if output_array is None: raise ValueError("Desired output not found in outputs") outputs = output_array # shape [50,6] point_list = [] thresh_hold = 0.7 for bbox_info in outputs: score = bbox_info[1] if score > thresh_hold: x1_raw = bbox_info[2] y1_raw = bbox_info[3] x2_raw = bbox_info[4] y2_raw = bbox_info[5] print(f"Raw bbox coordinates: x1={x1_raw}, y1={y1_raw}, x2={x2_raw}, y2={y2_raw}") x1 = max(min(int(x1_raw), raw_width - 1), 0) y1 = max(min(int(y1_raw), raw_height - 1), 0) x2 = max(min(int(x2_raw), raw_width - 1), 0) y2 = max(min(int(y2_raw), raw_height - 1), 0) print(f"Clamped bbox coordinates: x1={x1}, y1={y1}, x2={x2}, y2={y2}") point_xy = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]] point_list.append(point_xy) return point_list