123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import collections
- import threading
- import numpy as np
- import openvino as ov
- import os.path as osp
- import cv2
- from ..logger import logger
- from . import _utils
- from labelme.utils import img_qt_to_arr
- from qtpy import QtGui
- class Normalize:
- def __init__(self, mean=(0.15525904, 0.15525904, 0.15525904), std=(0.12552188, 0.12552188, 0.12552188)):
- if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
- raise ValueError("mean and std should be of type list or tuple.")
- self.mean = np.array(mean, dtype=np.float32)
- self.std = np.array(std, dtype=np.float32)
-
- self.mean = self.mean.reshape((1, 1, 3))
- self.std = self.std.reshape((1, 1, 3))
- def __call__(self, img):
- img = img.astype(np.float32) / 255.0
- img = (img - self.mean) / self.std
- return img
- class BarcodeDetectModel:
- def __init__(self, detection_model_path, segmentation_model_path=None):
- self.ie = ov.Core()
-
- self.detection_net = self.ie.read_model(model=detection_model_path)
- self.detection_sess = self.ie.compile_model(model=self.detection_net, device_name="CPU")
- self.detection_request = self.detection_sess.create_infer_request()
-
-
- self.segmentation_net = None
- self.segmentation_sess = None
- if segmentation_model_path:
- self.segmentation_net = self.ie.read_model(model=segmentation_model_path)
- self.segmentation_sess = self.ie.compile_model(model=self.segmentation_net, device_name="CPU")
-
- self._lock = threading.Lock()
- self.input_height = 640
- self.input_width = 640
- self.segmentation_input_shape = (1, 3, 128, 256)
- self._image_embedding_cache = collections.OrderedDict()
- self._max_cache_size = 10
- self.normalize = Normalize()
- def set_image(self, image: np.ndarray):
- with self._lock:
- self.raw_width = image.shape[1]
- self.raw_height = image.shape[0]
-
- input_tensor = self.preprocess_image(image)
- self._image = input_tensor
-
- self._im_shape = np.array([[self.raw_height, self.raw_width]], dtype=np.float32)
- self._scale_factor = np.array([[1.0, 1.0]], dtype=np.float32)
- self._thread = threading.Thread(
- target=self._compute_and_cache_image_embedding
- )
- self._thread.start()
- def preprocess_image(self, image, for_segmentation=False):
- if for_segmentation:
-
- logger.debug(f"Preprocessing image for segmentation: {image.shape}")
- norm = Normalize(mean=(0.447365,0.447365,0.447365), std=(0.17667491,0.17667491,0.17667491))
- resized_image = cv2.resize(image, (self.segmentation_input_shape[3], self.segmentation_input_shape[2]))
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = norm(resized_image)
- else:
-
- logger.debug(f"Preprocessing image for detection: {image.shape}")
- resized_image = cv2.resize(image, (self.input_width, self.input_height))
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = self.normalize(resized_image)
-
- input_tensor = resized_image.transpose(2, 0, 1)
- input_tensor = np.expand_dims(input_tensor, 0)
-
- return input_tensor
- def _compute_and_cache_image_embedding(self):
- with self._lock:
-
- inputs = {
- 'image': self._image,
- 'im_shape': self._im_shape,
- 'scale_factor': self._scale_factor
- }
-
- self._result = self.detection_request.infer(inputs)
-
- def _get_image_embedding(self):
- if self._thread is not None:
- self._thread.join()
- self._thread = None
- with self._lock:
- new_result = self._result
- return new_result
- def predict_mask_from_points(self,points=None,point_labels=None):
- return self._collect_result_from_output(
- outputs=self._get_image_embedding(),
- raw_width=self.raw_width,
- raw_height=self.raw_height,
- )
- def predict_polygon_from_points(self,points=None,point_labels=None):
- result_list=self.predict_mask_from_points(points,point_labels)
- return result_list
- def _collect_result_from_output(self, outputs, raw_width, raw_height):
-
- output_array = None
- for key in outputs:
- if 'save_infer_model/scale_0.tmp_0' in key.names:
- output_array = outputs[key]
- break
- if output_array is None:
- raise ValueError("Desired output not found in outputs")
- outputs = output_array
- point_list = []
- thresh_hold = 0.7
- for bbox_info in outputs:
- score = bbox_info[1]
- if score > thresh_hold:
- x1_raw = bbox_info[2]
- y1_raw = bbox_info[3]
- x2_raw = bbox_info[4]
- y2_raw = bbox_info[5]
-
- x1 = max(min(int(x1_raw), raw_width - 1), 0)
- y1 = max(min(int(y1_raw), raw_height - 1), 0)
- x2 = max(min(int(x2_raw), raw_width - 1), 0)
- y2 = max(min(int(y2_raw), raw_height - 1), 0)
-
- point_xy = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
- point_list.append(point_xy)
- return point_list
|