import collections import threading import numpy as np import openvino as ov import os.path as osp import cv2 from ..logger import logger from . import _utils import copy class BarcodePredictModel: def __init__(self,model_path): self.ie = ov.Core() bin_path = osp.splitext(model_path)[0] + '.bin' self.net = self.ie.read_model(model_path, bin_path) self.device = 'cpu' self._lock = threading.Lock() self._image_embedding_cache = collections.OrderedDict() self.ppp=ov.preprocess.PrePostProcessor(self.net) self.ppp.input().tensor().set_shape([1,480,480,1]).set_element_type(ov.Type.u8).set_layout(ov.Layout('NHWC')) self.ppp.input().model().set_layout(ov.Layout('NCHW')) # self.ppp.output().tensor().set_element_type(ov.Type.f32) self.net=self.ppp.build() self.sess = self.ie.compile_model(model=self.net, device_name=self.device.upper()) self.predict_session=self.sess.create_infer_request() def set_image(self,image:np.ndarray): with self._lock: #keep ratio is false self.raw_width=image.shape[1] self.raw_height=image.shape[0] resized_image=cv2.resize(image,(480,480)) input_tensor=np.expand_dims(resized_image,2) input_tensor=np.expand_dims(input_tensor,0) self._image=input_tensor self._thread = threading.Thread( target=self._compute_and_cache_image_embedding ) self._thread.start() def _compute_and_cache_image_embedding(self): with self._lock: logger.debug("Computing image embedding using BarcodePredict...") #we dont need to transfer because we use mono camera self._result = self.predict_session.infer(self._image) logger.debug("Done computing image embedding.") def _get_image_embedding(self): if self._thread is not None: self._thread.join() self._thread=None with self._lock: new_result=self._result return new_result def predict_mask_from_points(self,points=None,point_labels=None): return _collect_result_from_output( outputs=self._get_image_embedding(), raw_width=self.raw_width, raw_height=self.raw_height, ) def predict_polygon_from_points(self,points=None,point_labels=None): result_list=self.predict_mask_from_points(points,point_labels) return result_list def _collect_result_from_output(outputs,raw_width,raw_height): outputs=np.squeeze(outputs[0],axis=0) point_list=[] thresh_hold=0.5 for bbox_info in outputs: if(bbox_info[4]>thresh_hold): ratio_x=raw_width/480 ratio_y=raw_height/480 #0->x1 1->y1 2->x2 3->y2 x1=max(min(int(ratio_x*bbox_info[0]),raw_width-1),0) y1=max(min(int(ratio_y*bbox_info[1]),raw_height-1),0) x2=max(min(int(ratio_x*bbox_info[2]),raw_width-1),0) y2=max(min(int(ratio_y*bbox_info[3]),raw_height-1),0) point_xy=[] point_xy.append([x1,y1]) point_xy.append([x2,y1]) point_xy.append([x2,y2]) point_xy.append([x1,y2]) point_list.append(point_xy) return point_list