barcode_model.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import collections
  2. import threading
  3. import numpy as np
  4. import openvino as ov
  5. import os.path as osp
  6. import cv2
  7. from ..logger import logger
  8. from . import _utils
  9. import copy
  10. class BarcodePredictModel:
  11. def __init__(self,model_path):
  12. self.ie = ov.Core()
  13. bin_path = osp.splitext(model_path)[0] + '.bin'
  14. self.net = self.ie.read_model(model_path, bin_path)
  15. self.device = 'cpu'
  16. self._lock = threading.Lock()
  17. self._image_embedding_cache = collections.OrderedDict()
  18. self.ppp=ov.preprocess.PrePostProcessor(self.net)
  19. self.ppp.input().tensor().set_shape([1,480,480,1]).set_element_type(ov.Type.u8).set_layout(ov.Layout('NHWC'))
  20. self.ppp.input().model().set_layout(ov.Layout('NCHW'))
  21. # self.ppp.output().tensor().set_element_type(ov.Type.f32)
  22. self.net=self.ppp.build()
  23. self.sess = self.ie.compile_model(model=self.net, device_name=self.device.upper())
  24. self.predict_session=self.sess.create_infer_request()
  25. def set_image(self,image:np.ndarray):
  26. with self._lock:
  27. #keep ratio is false
  28. self.raw_width=image.shape[1]
  29. self.raw_height=image.shape[0]
  30. resized_image=cv2.resize(image,(480,480))
  31. input_tensor=np.expand_dims(resized_image,2)
  32. input_tensor=np.expand_dims(input_tensor,0)
  33. self._image=input_tensor
  34. self._thread = threading.Thread(
  35. target=self._compute_and_cache_image_embedding
  36. )
  37. self._thread.start()
  38. def _compute_and_cache_image_embedding(self):
  39. with self._lock:
  40. logger.debug("Computing image embedding using BarcodePredict...")
  41. #we dont need to transfer because we use mono camera
  42. self._result = self.predict_session.infer(self._image)
  43. logger.debug("Done computing image embedding.")
  44. def _get_image_embedding(self):
  45. if self._thread is not None:
  46. self._thread.join()
  47. self._thread=None
  48. with self._lock:
  49. new_result=self._result
  50. return new_result
  51. def predict_mask_from_points(self,points=None,point_labels=None):
  52. return _collect_result_from_output(
  53. outputs=self._get_image_embedding(),
  54. raw_width=self.raw_width,
  55. raw_height=self.raw_height,
  56. )
  57. def predict_polygon_from_points(self,points=None,point_labels=None):
  58. result_list=self.predict_mask_from_points(points,point_labels)
  59. return result_list
  60. def _collect_result_from_output(outputs,raw_width,raw_height):
  61. outputs=np.squeeze(outputs[0],axis=0)
  62. point_list=[]
  63. thresh_hold=0.5
  64. for bbox_info in outputs:
  65. if(bbox_info[4]>thresh_hold):
  66. ratio_x=raw_width/480
  67. ratio_y=raw_height/480
  68. #0->x1 1->y1 2->x2 3->y2
  69. x1=max(min(int(ratio_x*bbox_info[0]),raw_width-1),0)
  70. y1=max(min(int(ratio_y*bbox_info[1]),raw_height-1),0)
  71. x2=max(min(int(ratio_x*bbox_info[2]),raw_width-1),0)
  72. y2=max(min(int(ratio_y*bbox_info[3]),raw_height-1),0)
  73. point_xy=[]
  74. point_xy.append([x1,y1])
  75. point_xy.append([x2,y1])
  76. point_xy.append([x2,y2])
  77. point_xy.append([x1,y2])
  78. point_list.append(point_xy)
  79. return point_list