123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import collections
- import threading
- import numpy as np
- import openvino as ov
- import os.path as osp
- import cv2
- from ..logger import logger
- from . import _utils
- from labelme.utils import img_qt_to_arr
- from qtpy import QtGui
- class Normalize:
- def __init__(self, mean=(0.15525904, 0.15525904, 0.15525904), std=(0.12552188, 0.12552188, 0.12552188)):
- if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
- raise ValueError("mean and std should be of type list or tuple.")
- self.mean = np.array(mean, dtype=np.float32)
- self.std = np.array(std, dtype=np.float32)
- # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
- self.mean = self.mean.reshape((1, 1, 3))
- self.std = self.std.reshape((1, 1, 3))
- def __call__(self, img):
- img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
- img = (img - self.mean) / self.std # Normalize
- return img
- class BarcodeDetectModel:
- def __init__(self, detection_model_path, segmentation_model_path=None):
- self.ie = ov.Core()
- # Load detection model
- self.detection_net = self.ie.read_model(model=detection_model_path)
- self.detection_sess = self.ie.compile_model(model=self.detection_net, device_name="CPU")
- self.detection_request = self.detection_sess.create_infer_request()
-
- # Load segmentation model if provided
- self.segmentation_net = None
- self.segmentation_sess = None
- if segmentation_model_path:
- self.segmentation_net = self.ie.read_model(model=segmentation_model_path)
- self.segmentation_sess = self.ie.compile_model(model=self.segmentation_net, device_name="CPU")
-
- self._lock = threading.Lock()
- self.input_height = 640 # Input shape for detection model (example size)
- self.input_width = 640
- self.segmentation_input_shape = (1, 3, 128, 256) # Input shape for segmentation model
- self._image_embedding_cache = collections.OrderedDict()
- self._max_cache_size = 10
- self.normalize = Normalize() # Normalization instance
- def set_image(self, image: np.ndarray):
- with self._lock:
- self.raw_width = image.shape[1]
- self.raw_height = image.shape[0]
- # Preprocess the image
- input_tensor = self.preprocess_image(image)
- self._image = input_tensor
- # Prepare other inputs
- self._im_shape = np.array([[self.raw_height, self.raw_width]], dtype=np.float32)
- self._scale_factor = np.array([[1.0, 1.0]], dtype=np.float32)
- self._thread = threading.Thread(
- target=self._compute_and_cache_image_embedding
- )
- self._thread.start()
- def preprocess_image(self, image, for_segmentation=False):
- if for_segmentation:
- # Resize image to segmentation model input size
- logger.debug(f"Preprocessing image for segmentation: {image.shape}")
- norm = Normalize(mean=(0.447365,0.447365,0.447365), std=(0.17667491,0.17667491,0.17667491))
- resized_image = cv2.resize(image, (self.segmentation_input_shape[3], self.segmentation_input_shape[2])) # Width, Height
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = norm(resized_image) # Normalize for segmentation model
- else:
- # Resize image for detection model input size
- logger.debug(f"Preprocessing image for detection: {image.shape}")
- resized_image = cv2.resize(image, (self.input_width, self.input_height))
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = self.normalize(resized_image)
- # resized_image = resized_image.astype('float32') / 255.0
- input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
- input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
- # logger.debug(f"Processed image shape: {input_tensor.shape}")
- return input_tensor
- def _compute_and_cache_image_embedding(self):
- with self._lock:
- # Prepare the inputs dictionary
- inputs = {
- 'image': self._image,
- 'im_shape': self._im_shape,
- 'scale_factor': self._scale_factor
- }
- # Perform inference
- self._result = self.detection_request.infer(inputs)
- # print("models results:", self._result)
- def _get_image_embedding(self):
- if self._thread is not None:
- self._thread.join()
- self._thread = None
- with self._lock:
- new_result = self._result
- return new_result
- def predict_mask_from_points(self,points=None,point_labels=None):
- return self._collect_result_from_output(
- outputs=self._get_image_embedding(),
- raw_width=self.raw_width,
- raw_height=self.raw_height,
- )
- def predict_polygon_from_points(self,points=None,point_labels=None):
- result_list=self.predict_mask_from_points(points,point_labels)
- return result_list
- def _collect_result_from_output(self, outputs, raw_width, raw_height):
- # Extract the desired output array from outputs dictionary
- output_array = None
- for key in outputs:
- if 'save_infer_model/scale_0.tmp_0' in key.names:
- output_array = outputs[key]
- break
- if output_array is None:
- raise ValueError("Desired output not found in outputs")
- outputs = output_array # shape [50,6]
- point_list = []
- thresh_hold = 0.7
- for bbox_info in outputs:
- score = bbox_info[1]
- if score > thresh_hold:
- x1_raw = bbox_info[2]
- y1_raw = bbox_info[3]
- x2_raw = bbox_info[4]
- y2_raw = bbox_info[5]
- # print(f"Raw bbox coordinates: x1={x1_raw}, y1={y1_raw}, x2={x2_raw}, y2={y2_raw}")
- x1 = max(min(int(x1_raw), raw_width - 1), 0)
- y1 = max(min(int(y1_raw), raw_height - 1), 0)
- x2 = max(min(int(x2_raw), raw_width - 1), 0)
- y2 = max(min(int(y2_raw), raw_height - 1), 0)
- # print(f"Clamped bbox coordinates: x1={x1}, y1={y1}, x2={x2}, y2={y2}")
- point_xy = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
- point_list.append(point_xy)
- return point_list
|