| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- import collections
- import threading
- import numpy as np
- import openvino as ov
- import os.path as osp
- import cv2
- from ..logger import logger
- from . import _utils
- from labelme.utils import img_qt_to_arr
- from qtpy import QtGui
- class Normalize:
- def __init__(self, mean=(0., 0., 0.), std=(0.1, 0.1, 0.1)):
- if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
- raise ValueError("mean and std should be of type list or tuple.")
- self.mean = np.array(mean, dtype=np.float32)
- self.std = np.array(std, dtype=np.float32)
- self.mean = self.mean.reshape((1, 1, 3))
- self.std = self.std.reshape((1, 1, 3))
- def __call__(self, img):
- img = img.astype(np.float32) / 255.0
- img = (img - self.mean) / self.std
- return img
- class BarcodeDetectModel:
- def __init__(self, detection_model_path, segmentation_model_path=None):
- self.ie = ov.Core()
- self.detection_net = self.ie.read_model(model=detection_model_path)
- self.detection_sess = self.ie.compile_model(self.detection_net, "CPU")
- self.detection_request = self.detection_sess.create_infer_request()
- self.segmentation_net = None
- self.segmentation_sess = None
- if segmentation_model_path:
- self.segmentation_net = self.ie.read_model(model=segmentation_model_path)
- self.segmentation_sess = self.ie.compile_model(self.segmentation_net, "CPU")
- self._lock = threading.Lock()
- self.input_height = 1024
- self.input_width = 1024
- self.segmentation_input_shape = (1, 1, 192, 320)
- self._image_embedding_cache = collections.OrderedDict()
- self._max_cache_size = 10
- self.normalize = Normalize()
- def set_image(self, image: np.ndarray):
- with self._lock:
- self.raw_width = image.shape[1]
- self.raw_height = image.shape[0]
- input_tensor = self.preprocess_image(image)
- self._image = input_tensor
- self._thread = threading.Thread(
- target=self._compute_and_cache_image_embedding
- )
- self._thread.start()
- def preprocess_image(self, image, for_segmentation=False):
- if for_segmentation:
- logger.debug(f"Preprocessing image for segmentation: {image.shape}")
- norm = Normalize(mean=(0.5,0.5,0.5), std=(0.5,0.5,0.5))
- resized_image = cv2.resize(
- image,
- (self.segmentation_input_shape[3], self.segmentation_input_shape[2])
- )
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = norm(resized_image)
- else:
- logger.debug(f"Preprocessing image for detection: {image.shape}")
- h, w = image.shape[:2]
- r = min(self.input_height / h, self.input_width / w)
- nh, nw = int(round(h * r)), int(round(w * r))
- image_resized = cv2.resize(image, (nw, nh), interpolation=cv2.INTER_LINEAR)
- top = (self.input_height - nh) // 2
- bottom = self.input_height - nh - top
- left = (self.input_width - nw) // 2
- right = self.input_width - nw - left
- image_padded = cv2.copyMakeBorder(
- image_resized,
- top, bottom, left, right,
- cv2.BORDER_CONSTANT,
- value=(114,114,114)
- )
- # store for scaling
- self._letterbox_gain = r
- self._letterbox_pad = (left, top)
- resized_image = cv2.cvtColor(image_padded, cv2.COLOR_BGR2RGB)
- resized_image = resized_image.astype(np.float32) / 255.0
- input_tensor = resized_image.transpose(2, 0, 1)
- input_tensor = np.expand_dims(input_tensor, 0)
- return input_tensor
- def _compute_and_cache_image_embedding(self):
- with self._lock:
- inputs = [self._image]
- self._result = self.detection_request.infer(inputs)
- def _get_image_embedding(self):
- if self._thread is not None:
- self._thread.join()
- self._thread = None
- with self._lock:
- return self._result
- def predict_mask_from_points(self,points=None,point_labels=None):
- return self._collect_result_from_output(
- outputs=self._get_image_embedding(),
- raw_width=self.raw_width,
- raw_height=self.raw_height,
- )
- def predict_polygon_from_points(self,points=None,point_labels=None):
- result_list=self.predict_mask_from_points(points,point_labels)
- return result_list
- def _collect_result_from_output(self, outputs, raw_width, raw_height):
- output_array = list(outputs.values())[0]
- outputs = output_array[0] # (300,6)
- point_list = []
- thresh_hold = 0.5
- gain = self._letterbox_gain
- pad_x, pad_y = self._letterbox_pad
- for bbox_info in outputs:
- score = float(bbox_info[4])
- if score > thresh_hold:
- x1_raw = bbox_info[0]
- y1_raw = bbox_info[1]
- x2_raw = bbox_info[2]
- y2_raw = bbox_info[3]
- x1 = (x1_raw - pad_x) / gain
- y1 = (y1_raw - pad_y) / gain
- x2 = (x2_raw - pad_x) / gain
- y2 = (y2_raw - pad_y) / gain
- x1 = max(min(int(x1), raw_width - 1), 0)
- y1 = max(min(int(y1), raw_height - 1), 0)
- x2 = max(min(int(x2), raw_width - 1), 0)
- y2 = max(min(int(y2), raw_height - 1), 0)
- point_xy = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
- point_list.append(point_xy)
- return point_list
|